Merge branch 'master' into dynamic-constraints

This commit is contained in:
Pavel Kruglov 2024-10-07 06:54:10 +02:00 committed by GitHub
commit 1fef195d68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
301 changed files with 24059 additions and 11352 deletions

View File

@ -4,7 +4,6 @@ self-hosted-runner:
- func-tester
- func-tester-aarch64
- fuzzer-unit-tester
- stress-tester
- style-checker
- style-checker-aarch64
- release-maker

View File

@ -229,18 +229,26 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (tsan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
#############################################################################################
############################# INTEGRATION TESTS #############################################
#############################################################################################
IntegrationTestsRelease:
needs: [RunConfig, BuilderDebRelease]
IntegrationTestsAsanOldAnalyzer:
needs: [RunConfig, BuilderDebAsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (release)
runner_type: stress-tester
test_name: Integration tests (asan, old analyzer)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsTsan:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (tsan)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FinishCheck:
if: ${{ !cancelled() }}
@ -250,7 +258,8 @@ jobs:
- FunctionalStatelessTestAsan
- FunctionalStatefulTestDebug
- StressTestTsan
- IntegrationTestsRelease
- IntegrationTestsTsan
- IntegrationTestsAsanOldAnalyzer
- CompatibilityCheckX86
- CompatibilityCheckAarch64
runs-on: [self-hosted, style-checker]

View File

@ -374,7 +374,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (asan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestTsan:
needs: [RunConfig, BuilderDebTsan]
@ -382,7 +382,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (tsan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestMsan:
needs: [RunConfig, BuilderDebMsan]
@ -390,7 +390,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (msan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestUBsan:
needs: [RunConfig, BuilderDebUBsan]
@ -398,7 +398,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (ubsan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestDebug:
needs: [RunConfig, BuilderDebDebug]
@ -406,7 +406,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (debug)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
#############################################################################################
############################# INTEGRATION TESTS #############################################
@ -417,7 +417,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (asan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsAnalyzerAsan:
needs: [RunConfig, BuilderDebAsan]
@ -425,7 +425,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (asan, old analyzer)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsTsan:
needs: [RunConfig, BuilderDebTsan]
@ -433,7 +433,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (tsan)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsRelease:
needs: [RunConfig, BuilderDebRelease]
@ -441,7 +441,7 @@ jobs:
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (release)
runner_type: stress-tester
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FinishCheck:
if: ${{ !cancelled() }}

View File

@ -339,7 +339,6 @@ set (CMAKE_ASM_FLAGS_RELWITHDEBINFO "${CMAKE_ASM_FLAGS_RELWITHDEBINFO} -O3
set (CMAKE_ASM_FLAGS_DEBUG "${CMAKE_ASM_FLAGS_DEBUG} -O${DEBUG_O_LEVEL} ${DEBUG_INFO_FLAGS} ${CMAKE_ASM_FLAGS_ADD}")
if (OS_DARWIN)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-U,_inside_main")
endif()

View File

@ -1420,8 +1420,6 @@ config
configs
conformant
congruential
conjuction
conjuctive
connectionId
const
contrib
@ -1698,7 +1696,6 @@ formatReadableSize
formatReadableTimeDelta
formatRow
formatRowNoNewline
formated
formatschema
formatter
formatters
@ -3048,3 +3045,89 @@ znode
znodes
zookeeperSessionUptime
zstd
ArrowCompression
CapnProtoEnumComparingMode
DateTimeInputFormat
DateTimeOutputFormat
DateTimeOverflowBehavior
deserialize
dotall
EachRow
EscapingRule
IdentifierQuotingRule
IdentifierQuotingStyle
IntervalOutputFormat
MsgPackUUIDRepresentation
ORCCompression
ParquetCompression
ParquetVersion
SchemaInferenceMode
alloc
CacheWarmer
conjuctive
cors
CORS
countIf
DefaultTableEngine
dereference
DistributedDDLOutputMode
DistributedProductMode
formatdatetime
inequal
INVOKER
ITION
JoinAlgorithm
JoinStrictness
keepalive
ListObject
ListObjects
LoadBalancing
LocalFSReadMethod
LogQueriesType
LogsLevel
MaxThreads
MemorySample
multibuffer
multiif
multiread
multithreading
MySQLDataTypesSupport
nonconst
NonZeroUInt
nullptr
OverflowMode
OverflowModeGroupBy
ParallelReplicasMode
param
parsedatetime
perf
PerfEventInfo
perkey
prefetched
prefetches
prefetching
preimage
QueryCacheNondeterministicFunctionHandling
QueryCacheSystemTableHandling
remerge
replcase
rerange
RetryStrategy
rowlist
SetOperationMode
ShortCircuitFunctionEvaluation
SQLSecurityType
sumIf
TCPHandler
throwif
TotalsMode
TransactionsWaitCSNMode
undelete
unmerged
DataPacket
DDLs
DistributedCacheLogMode
DistributedCachePoolBehaviourOnLimit
SharedJoin
ShareSet
unacked

View File

@ -48,6 +48,8 @@ if (NOT LINKER_NAME)
find_program (LLD_PATH NAMES "ld.lld-${COMPILER_VERSION_MAJOR}" "ld.lld")
elseif (OS_DARWIN)
find_program (LLD_PATH NAMES "ld")
# Duplicate libraries passed to the linker is not a problem.
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-no_warn_duplicate_libraries")
endif ()
if (LLD_PATH)
if (OS_LINUX OR OS_DARWIN)

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 7bc3abe952aba1dc7bce7f2f790dc781cb51a41e
Subproject commit 62e871c36fa93c0af939bd31762845265214fe3d

2
contrib/libdivide vendored

@ -1 +1 @@
Subproject commit 3bd34388573681ce563348cdf04fe15d24770d04
Subproject commit 01526031eb79375dc85e0212c966d2c514a01234

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit 6060be2fdf62edf4a8f51a8b0883d57d09397b30
Subproject commit e341c8b43861b43de29c48ab65f292d997096953

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -35,7 +35,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -28,7 +28,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
#docker-official-library:off

View File

@ -0,0 +1 @@
[rabbitmq_consistent_hash_exchange].

View File

@ -13,3 +13,5 @@ ssl_options.fail_if_no_peer_cert = false
ssl_options.cacertfile = /etc/rabbitmq/ca-cert.pem
ssl_options.certfile = /etc/rabbitmq/server-cert.pem
ssl_options.keyfile = /etc/rabbitmq/server-key.pem
vm_memory_high_watermark.absolute = 2GB

View File

@ -41,7 +41,7 @@ sidebar_label: 2022
* Backported in [#25364](https://github.com/ClickHouse/ClickHouse/issues/25364): On ZooKeeper connection loss `ReplicatedMergeTree` table might wait for background operations to complete before trying to reconnect. It's fixed, now background operations are stopped forcefully. [#25306](https://github.com/ClickHouse/ClickHouse/pull/25306) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Backported in [#25387](https://github.com/ClickHouse/ClickHouse/issues/25387): Fix the possibility of non-deterministic behaviour of the `quantileDeterministic` function and similar. This closes [#20480](https://github.com/ClickHouse/ClickHouse/issues/20480). [#25313](https://github.com/ClickHouse/ClickHouse/pull/25313) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#25455](https://github.com/ClickHouse/ClickHouse/issues/25455): Fix lost `WHERE` condition in expression-push-down optimization of query plan (setting `query_plan_filter_push_down = 1` by default). Fixes [#25368](https://github.com/ClickHouse/ClickHouse/issues/25368). [#25370](https://github.com/ClickHouse/ClickHouse/pull/25370) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#25406](https://github.com/ClickHouse/ClickHouse/issues/25406): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formated query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Backported in [#25406](https://github.com/ClickHouse/ClickHouse/issues/25406): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formatted query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Backported in [#25505](https://github.com/ClickHouse/ClickHouse/issues/25505): Fix segfault when sharding_key is absent in task config for copier. [#25419](https://github.com/ClickHouse/ClickHouse/pull/25419) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
#### NO CL ENTRY

View File

@ -40,7 +40,7 @@ sidebar_label: 2022
* Backported in [#25362](https://github.com/ClickHouse/ClickHouse/issues/25362): On ZooKeeper connection loss `ReplicatedMergeTree` table might wait for background operations to complete before trying to reconnect. It's fixed, now background operations are stopped forcefully. [#25306](https://github.com/ClickHouse/ClickHouse/pull/25306) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Backported in [#25386](https://github.com/ClickHouse/ClickHouse/issues/25386): Fix the possibility of non-deterministic behaviour of the `quantileDeterministic` function and similar. This closes [#20480](https://github.com/ClickHouse/ClickHouse/issues/20480). [#25313](https://github.com/ClickHouse/ClickHouse/pull/25313) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#25456](https://github.com/ClickHouse/ClickHouse/issues/25456): Fix lost `WHERE` condition in expression-push-down optimization of query plan (setting `query_plan_filter_push_down = 1` by default). Fixes [#25368](https://github.com/ClickHouse/ClickHouse/issues/25368). [#25370](https://github.com/ClickHouse/ClickHouse/pull/25370) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#25408](https://github.com/ClickHouse/ClickHouse/issues/25408): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formated query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Backported in [#25408](https://github.com/ClickHouse/ClickHouse/issues/25408): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formatted query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Backported in [#25504](https://github.com/ClickHouse/ClickHouse/issues/25504): Fix segfault when sharding_key is absent in task config for copier. [#25419](https://github.com/ClickHouse/ClickHouse/pull/25419) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
#### NO CL ENTRY

View File

@ -24,7 +24,7 @@ sidebar_label: 2022
* Backported in [#25363](https://github.com/ClickHouse/ClickHouse/issues/25363): On ZooKeeper connection loss `ReplicatedMergeTree` table might wait for background operations to complete before trying to reconnect. It's fixed, now background operations are stopped forcefully. [#25306](https://github.com/ClickHouse/ClickHouse/pull/25306) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Backported in [#25388](https://github.com/ClickHouse/ClickHouse/issues/25388): Fix the possibility of non-deterministic behaviour of the `quantileDeterministic` function and similar. This closes [#20480](https://github.com/ClickHouse/ClickHouse/issues/20480). [#25313](https://github.com/ClickHouse/ClickHouse/pull/25313) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#25448](https://github.com/ClickHouse/ClickHouse/issues/25448): Fix lost `WHERE` condition in expression-push-down optimization of query plan (setting `query_plan_filter_push_down = 1` by default). Fixes [#25368](https://github.com/ClickHouse/ClickHouse/issues/25368). [#25370](https://github.com/ClickHouse/ClickHouse/pull/25370) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#25407](https://github.com/ClickHouse/ClickHouse/issues/25407): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formated query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Backported in [#25407](https://github.com/ClickHouse/ClickHouse/issues/25407): Fix `REPLACE` column transformer when used in DDL by correctly quoting the formatted query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
#### NOT FOR CHANGELOG / INSIGNIFICANT

View File

@ -133,7 +133,7 @@ sidebar_label: 2022
* On ZooKeeper connection loss `ReplicatedMergeTree` table might wait for background operations to complete before trying to reconnect. It's fixed, now background operations are stopped forcefully. [#25306](https://github.com/ClickHouse/ClickHouse/pull/25306) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix the possibility of non-deterministic behaviour of the `quantileDeterministic` function and similar. This closes [#20480](https://github.com/ClickHouse/ClickHouse/issues/20480). [#25313](https://github.com/ClickHouse/ClickHouse/pull/25313) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix lost `WHERE` condition in expression-push-down optimization of query plan (setting `query_plan_filter_push_down = 1` by default). Fixes [#25368](https://github.com/ClickHouse/ClickHouse/issues/25368). [#25370](https://github.com/ClickHouse/ClickHouse/pull/25370) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix `REPLACE` column transformer when used in DDL by correctly quoting the formated query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Fix `REPLACE` column transformer when used in DDL by correctly quoting the formatted query. This fixes [#23925](https://github.com/ClickHouse/ClickHouse/issues/23925). [#25391](https://github.com/ClickHouse/ClickHouse/pull/25391) ([Amos Bird](https://github.com/amosbird)).
* Fix segfault when sharding_key is absent in task config for copier. [#25419](https://github.com/ClickHouse/ClickHouse/pull/25419) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Fix excessive underscore before the names of the preprocessed configuration files. [#25431](https://github.com/ClickHouse/ClickHouse/pull/25431) ([Vitaly Baranov](https://github.com/vitlibar)).
* Fix convertion of datetime with timezone for MySQL, PostgreSQL, ODBC. Closes [#5057](https://github.com/ClickHouse/ClickHouse/issues/5057). [#25528](https://github.com/ClickHouse/ClickHouse/pull/25528) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -0,0 +1,33 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.9.2.42-stable (de7c791a2ea) FIXME as compared to v24.9.1.3278-stable (6d058d82a8e)
#### Improvement
* Backported in [#70091](https://github.com/ClickHouse/ClickHouse/issues/70091): Add `show_create_query_identifier_quoting_rule` to define identifier quoting behavior of the show create query result. Possible values: - `user_display`: When the identifiers is a keyword. - `when_necessary`: When the identifiers is one of `{"distinct", "all", "table"}`, or it can cause ambiguity: column names, dictionary attribute names. - `always`: Always quote identifiers. [#69448](https://github.com/ClickHouse/ClickHouse/pull/69448) ([tuanpach](https://github.com/tuanpach)).
* Backported in [#70100](https://github.com/ClickHouse/ClickHouse/issues/70100): Follow-up to https://github.com/ClickHouse/ClickHouse/pull/69346 Point 4 described there will work now as well:. [#69563](https://github.com/ClickHouse/ClickHouse/pull/69563) ([Vitaly Baranov](https://github.com/vitlibar)).
* Backported in [#70048](https://github.com/ClickHouse/ClickHouse/issues/70048): Add new column readonly_duration to the system.replicas table. Needed to be able to distinguish actual readonly replicas from sentinel ones in alerts. [#69871](https://github.com/ClickHouse/ClickHouse/pull/69871) ([Miсhael Stetsyuk](https://github.com/mstetsyuk)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#70193](https://github.com/ClickHouse/ClickHouse/issues/70193): Fix crash when executing `create view t as (with recursive 42 as ttt select ttt);`. [#69676](https://github.com/ClickHouse/ClickHouse/pull/69676) ([Han Fei](https://github.com/hanfei1991)).
* Backported in [#70083](https://github.com/ClickHouse/ClickHouse/issues/70083): Closes [#69752](https://github.com/ClickHouse/ClickHouse/issues/69752). [#69985](https://github.com/ClickHouse/ClickHouse/pull/69985) ([pufit](https://github.com/pufit)).
* Backported in [#70070](https://github.com/ClickHouse/ClickHouse/issues/70070): Fixes `Block structure mismatch` for queries with nested views and `WHERE` condition. Fixes [#66209](https://github.com/ClickHouse/ClickHouse/issues/66209). [#70054](https://github.com/ClickHouse/ClickHouse/pull/70054) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#70168](https://github.com/ClickHouse/ClickHouse/issues/70168): Fix wrong LOGICAL_ERROR when replacing literals in ranges. [#70122](https://github.com/ClickHouse/ClickHouse/pull/70122) ([Pablo Marcos](https://github.com/pamarcos)).
* Backported in [#70238](https://github.com/ClickHouse/ClickHouse/issues/70238): Check for Nullable(Nothing) type during ALTER TABLE MODIFY COLUMN/QUERY to prevent tables with such data type. [#70123](https://github.com/ClickHouse/ClickHouse/pull/70123) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70205](https://github.com/ClickHouse/ClickHouse/issues/70205): Fix wrong result with skipping index. [#70127](https://github.com/ClickHouse/ClickHouse/pull/70127) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#70185](https://github.com/ClickHouse/ClickHouse/issues/70185): Fix data race in ColumnObject/ColumnTuple decompress method that could lead to heap use after free. [#70137](https://github.com/ClickHouse/ClickHouse/pull/70137) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70253](https://github.com/ClickHouse/ClickHouse/issues/70253): Fix possible hung in ALTER COLUMN with Dynamic type. [#70144](https://github.com/ClickHouse/ClickHouse/pull/70144) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70230](https://github.com/ClickHouse/ClickHouse/issues/70230): Use correct `max_types` parameter during Dynamic type creation for JSON subcolumn. [#70147](https://github.com/ClickHouse/ClickHouse/pull/70147) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70217](https://github.com/ClickHouse/ClickHouse/issues/70217): Fix the password being displayed in `system.query_log` for users with bcrypt password authentication method. [#70148](https://github.com/ClickHouse/ClickHouse/pull/70148) ([Nikolay Degterinsky](https://github.com/evillique)).
* Backported in [#70267](https://github.com/ClickHouse/ClickHouse/issues/70267): Respect setting allow_simdjson in JSON type parser. [#70218](https://github.com/ClickHouse/ClickHouse/pull/70218) ([Pavel Kruglov](https://github.com/Avogar)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#70052](https://github.com/ClickHouse/ClickHouse/issues/70052): Improve stateless test runner. [#69864](https://github.com/ClickHouse/ClickHouse/pull/69864) ([Alexey Katsman](https://github.com/alexkats)).
* Backported in [#70284](https://github.com/ClickHouse/ClickHouse/issues/70284): Improve pipdeptree generator for docker images. - Update requirements.txt for the integration tests runner container - Remove some small dependencies, improve `helpers/retry_decorator.py` - Upgrade docker-compose from EOL version 1 to version 2. [#70146](https://github.com/ClickHouse/ClickHouse/pull/70146) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#70261](https://github.com/ClickHouse/ClickHouse/issues/70261): Update test_storage_s3_queue/test.py. [#70159](https://github.com/ClickHouse/ClickHouse/pull/70159) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -195,6 +195,9 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--print-profile-events` Print `ProfileEvents` packets.
- `--profile-events-delay-ms` Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet).
- `--jwt` If specified, enables authorization via JSON Web Token. Server JWT authorization is available only in ClickHouse Cloud.
- `--progress` Print progress of query execution. Possible values: 'tty|on|1|true|yes' - outputs to TTY in interactive mode; 'err' - outputs to STDERR non-interactive mode; 'off|0|false|no' - disables the progress printing. Default: TTY in interactive mode, disabled in non-interactive.
- `--progress-table` Print a progress table with changing metrics during query execution. Possible values: 'tty|on|1|true|yes' - outputs to TTY in interactive mode; 'err' - outputs to STDERR non-interactive mode; 'off|0|false|no' - disables the progress table. Default: TTY in interactive mode, disabled in non-interactive.
- `--enable-progress-table-toggle` Enable toggling of the progress table by pressing the control key (Space). Only applicable in interactive mode with the progress table printing enabled. Default: 'true'.
Instead of `--host`, `--port`, `--user` and `--password` options, ClickHouse client also supports connection strings (see next section).

View File

@ -1057,12 +1057,12 @@ Default value: throw
## deduplicate_merge_projection_mode
Whether to allow create projection for the table with non-classic MergeTree, that is not (Replicated, Shared) MergeTree. If allowed, what is the action when merge projections, either drop or rebuild. So classic MergeTree would ignore this setting.
Whether to allow create projection for the table with non-classic MergeTree, that is not (Replicated, Shared) MergeTree. Ignore option is purely for compatibility which might result in incorrect answer. Otherwise, if allowed, what is the action when merge projections, either drop or rebuild. So classic MergeTree would ignore this setting.
It also controls `OPTIMIZE DEDUPLICATE` as well, but has effect on all MergeTree family members. Similar to the option `lightweight_mutation_projection_mode`, it is also part level.
Possible values:
- throw, drop, rebuild
- ignore, throw, drop, rebuild
Default value: throw

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,21 +10,21 @@ Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the table is in.
- `view` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — Table uuid (Atomic database).
- `status` ([String](../../sql-reference/data-types/string.md)) — Current state of the refresh.
- `last_refresh_result` ([String](../../sql-reference/data-types/string.md)) — Outcome of the latest refresh attempt.
- `last_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the last refresh attempt. `NULL` if no refresh attempts happened since server startup or table creation.
- `last_success_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the last successful refresh. `NULL` if no successful refreshes happened since server startup or table creation.
- `duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How long the last refresh attempt took.
- `next_refresh_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time at which the next refresh is scheduled to start.
- `remaining_dependencies` ([Array(String)](../../sql-reference/data-types/array.md)) — If the view has [refresh dependencies](../../sql-reference/statements/create/view.md#refresh-dependencies), this array contains the subset of those dependencies that are not satisfied for the current refresh yet. If `status = 'WaitingForDependencies'`, a refresh is ready to start as soon as these dependencies are fulfilled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — if `last_refresh_result = 'Error'`, i.e. the last refresh attempt failed, this column contains the corresponding error message and stack trace.
- `retry` ([UInt64](../../sql-reference/data-types/int-uint.md)) — If nonzero, the current or next refresh is a retry (see `refresh_retries` refresh setting), and `retry` is the 1-based index of that retry.
- `refresh_count` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of successful refreshes since last server restart or table creation.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far.
- `total_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Estimated total number of rows that need to be read by the current refresh.
(There are additional columns related to current refresh progress, but they are currently unreliable.)
- `last_success_time` ([Nullable](../../sql-reference/data-types/nullable.md)([DateTime](../../sql-reference/data-types/datetime.md))) — Time when the latest successful refresh started. NULL if no successful refreshes happened since server startup or table creation.
- `last_success_duration_ms` ([Nullable](../../sql-reference/data-types/nullable.md)([UInt64](../../sql-reference/data-types/int-uint.md))) — How long the latest refresh took.
- `last_refresh_time` ([Nullable](../../sql-reference/data-types/nullable.md)([DateTime](../../sql-reference/data-types/datetime.md))) — Time when the latest refresh attempt finished (if known) or started (if unknown or still running). NULL if no refresh attempts happened since server startup or table creation.
- `last_refresh_replica` ([String](../../sql-reference/data-types/string.md)) — If coordination is enabled, name of the replica that made the current (if running) or previous (if not running) refresh attempt.
- `next_refresh_time` ([Nullable](../../sql-reference/data-types/nullable.md)([DateTime](../../sql-reference/data-types/datetime.md))) — Time at which the next refresh is scheduled to start, if status = Scheduled.
- `exception` ([String](../../sql-reference/data-types/string.md)) — Error message from previous attempt if it failed.
- `retry` ([UInt64](../../sql-reference/data-types/int-uint.md)) — How many failed attempts there were so far, for the current refresh.
- `progress` ([Float64](../../sql-reference/data-types/float.md)) — Progress of the current refresh, between 0 and 1. Not available if status is `RunningOnAnotherReplica`.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows read by the current refresh so far. Not available if status is `RunningOnAnotherReplica`.
- `read_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of bytes read during the current refresh. Not available if status is `RunningOnAnotherReplica`.
- `total_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Estimated total number of rows that need to be read by the current refresh. Not available if status is `RunningOnAnotherReplica`.
- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of rows written during the current refresh. Not available if status is `RunningOnAnotherReplica`.
- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number rof bytes written during the current refresh. Not available if status is `RunningOnAnotherReplica`.
**Example**

View File

@ -226,9 +226,9 @@ Result:
## bitTestAll
Returns result of [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. Counting is right-to-left, starting at 0.
Returns result of [logical conjunction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. Counting is right-to-left, starting at 0.
The conjuction for bit-wise operations:
The conjunction for bit-wise operations:
0 AND 0 = 0
@ -251,7 +251,7 @@ SELECT bitTestAll(number, index1, index2, index3, index4, ...)
**Returned value**
- Result of the logical conjuction. [UInt8](../data-types/int-uint.md).
- Result of the logical conjunction. [UInt8](../data-types/int-uint.md).
**Example**

View File

@ -316,6 +316,38 @@ Result:
Same as `toIPv4`, but if the IPv4 address has an invalid format, it returns null.
**Syntax**
```sql
toIPv4OrNull(value)
```
**Arguments**
- `value` — The value with IPv4 address.
**Returned value**
- `value` converted to the current IPv4 address. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT
toIPv4OrNull('192.168.0.1') AS s1,
toIPv4OrNull('192.168.0') AS s2
```
Result:
```response
┌─s1──────────┬─s2───┐
│ 192.168.0.1 │ ᴺᵁᴸᴸ │
└─────────────┴──────┘
```
## toIPv6OrDefault(string)
Same as `toIPv6`, but if the IPv6 address has an invalid format, it returns `::` (0 IPv6).

View File

@ -135,15 +135,15 @@ To change SQL security for an existing view, use
ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }]
```
### Examples sql security
### Examples
```sql
CREATE test_view
CREATE VIEW test_view
DEFINER = alice SQL SECURITY DEFINER
AS SELECT ...
```
```sql
CREATE test_view
CREATE VIEW test_view
SQL SECURITY INVOKER
AS SELECT ...
```
@ -184,14 +184,6 @@ Differences from regular non-refreshable materialized views:
The settings in the `REFRESH ... SETTINGS` part of the query are refresh settings (e.g. `refresh_retries`), distinct from regular settings (e.g. `max_threads`). Regular settings can be specified using `SETTINGS` at the end of the query.
:::
:::note
Refreshable materialized views are a work in progress. Setting `allow_experimental_refreshable_materialized_view = 1` is required for creating one. Current limitations:
* not compatible with Replicated database or table engines
* It is not supported in ClickHouse Cloud
* require [Atomic database engine](../../../engines/database-engines/atomic.md),
* no limit on number of concurrent refreshes.
:::
### Refresh Schedule
Example refresh schedules:
@ -203,6 +195,10 @@ REFRESH EVERY 2 WEEK OFFSET 5 DAY 15 HOUR 10 MINUTE -- every other Saturday, at
REFRESH EVERY 30 MINUTE -- at 00:00, 00:30, 01:00, 01:30, etc
REFRESH AFTER 30 MINUTE -- 30 minutes after the previous refresh completes, no alignment with time of day
-- REFRESH AFTER 1 HOUR OFFSET 1 MINUTE -- syntax errror, OFFSET is not allowed with AFTER
REFRESH EVERY 1 WEEK 2 DAYS -- every 9 days, not on any particular day of the week or month;
-- specifically, when day number (since 1969-12-29) is divisible by 9
REFRESH EVERY 5 MONTHS -- every 5 months, different months each year (as 12 is not divisible by 5);
-- specifically, when month number (since 1970-01) is divisible by 5
```
`RANDOMIZE FOR` randomly adjusts the time of each refresh, e.g.:
@ -214,6 +210,16 @@ At most one refresh may be running at a time, for a given view. E.g. if a view w
Additionally, a refresh is started immediately after the materialized view is created, unless `EMPTY` is specified in the `CREATE` query. If `EMPTY` is specified, the first refresh happens according to schedule.
### In Replicated DB
If the refreshable materialized view is in a [Replicated database](../../../engines/database-engines/replicated.md), the replicas coordinate with each other such that only one replica performs the refresh at each scheduled time. [ReplicatedMergeTree](../../../engines/table-engines/mergetree-family/replication.md) table engine is required, so that all replicas see the data produced by the refresh.
In `APPEND` mode, coordination can be disabled using `SETTINGS all_replicas = 1`. This makes replicas do refreshes independently of each other. In this case ReplicatedMergeTree is not required.
In non-`APPEND` mode, only coordinated refreshing is supported. For uncoordinated, use `Atomic` database and `CREATE ... ON CLUSTER` query to create refreshable materialized views on all replicas.
The coordination is done through Keeper. The znode path is determined by [default_replica_path](../../../operations/server-configuration-parameters/settings.md#default_replica_path) server setting.
### Dependencies {#refresh-dependencies}
`DEPENDS ON` synchronizes refreshes of different tables. By way of example, suppose there's a chain of two refreshable materialized views:
@ -277,6 +283,8 @@ The status of all refreshable materialized views is available in table [`system.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.
:::note
Fun fact: the refresh query is allowed to read from the view that's being refreshed, seeing pre-refresh version of the data. This means you can implement Conway's game of life: https://pastila.nl/?00021a4b/d6156ff819c83d490ad2dcec05676865#O0LGWTO7maUQIA4AcGUtlA==
:::

View File

@ -233,15 +233,20 @@ Hierarchy of privileges:
- `addressToSymbol`
- `demangle`
- [SOURCES](#sources)
- `AZURE`
- `FILE`
- `URL`
- `REMOTE`
- `YSQL`
- `ODBC`
- `JDBC`
- `HDFS`
- `S3`
- `HIVE`
- `JDBC`
- `MONGO`
- `MYSQL`
- `ODBC`
- `POSTGRES`
- `REDIS`
- `REMOTE`
- `S3`
- `SQLITE`
- `URL`
- [dictGet](#dictget)
- [displaySecretsInShowAndSelect](#displaysecretsinshowandselect)
- [NAMED COLLECTION ADMIN](#named-collection-admin)
@ -510,15 +515,20 @@ Allows using [introspection](../../operations/optimizing-performance/sampling-qu
Allows using external data sources. Applies to [table engines](../../engines/table-engines/index.md) and [table functions](../../sql-reference/table-functions/index.md#table-functions).
- `SOURCES`. Level: `GROUP`
- `AZURE`. Level: `GLOBAL`
- `FILE`. Level: `GLOBAL`
- `URL`. Level: `GLOBAL`
- `REMOTE`. Level: `GLOBAL`
- `YSQL`. Level: `GLOBAL`
- `ODBC`. Level: `GLOBAL`
- `JDBC`. Level: `GLOBAL`
- `HDFS`. Level: `GLOBAL`
- `S3`. Level: `GLOBAL`
- `HIVE`. Level: `GLOBAL`
- `JDBC`. Level: `GLOBAL`
- `MONGO`. Level: `GLOBAL`
- `MYSQL`. Level: `GLOBAL`
- `ODBC`. Level: `GLOBAL`
- `POSTGRES`. Level: `GLOBAL`
- `REDIS`. Level: `GLOBAL`
- `REMOTE`. Level: `GLOBAL`
- `S3`. Level: `GLOBAL`
- `SQLITE`. Level: `GLOBAL`
- `URL`. Level: `GLOBAL`
The `SOURCES` privilege enables use of all the sources. Also you can grant a privilege for each source individually. To use sources, you need additional privileges.

View File

@ -565,3 +565,13 @@ If there's a refresh in progress for the given view, interrupt and cancel it. Ot
```sql
SYSTEM CANCEL VIEW [db.]name
```
### SYSTEM WAIT VIEW
Waits for the running refresh to complete. If no refresh is running, returns immediately. If the latest refresh attempt failed, reports an error.
Can be used right after creating a new refreshable materialized view (without EMPTY keyword) to wait for the initial refresh to complete.
```sql
SYSTEM WAIT VIEW [db.]name
```

View File

@ -220,7 +220,7 @@ SELECT bitTest(43, 2);
## bitTestAll {#bittestall}
返回给定位置所有位的 [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) 进行与操作的结果。位值从右到左数从0开始计数。
返回给定位置所有位的 [logical conjunction](https://en.wikipedia.org/wiki/Logical_conjunction) 进行与操作的结果。位值从右到左数从0开始计数。
与运算的结果:

View File

@ -346,7 +346,9 @@ try
processConfig();
adjustSettings();
initTTYBuffer(toProgressOption(config().getString("progress", "default")));
initTTYBuffer(toProgressOption(config().getString("progress", "default")),
toProgressOption(config().getString("progress-table", "default")));
initKeystrokeInterceptor();
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
{
@ -772,7 +774,7 @@ bool Client::processWithFuzzing(const String & full_query)
else
this_query_runs = 1;
}
else if (const auto * insert = orig_ast->as<ASTInsertQuery>())
else if (const auto * /*insert*/ _ = orig_ast->as<ASTInsertQuery>())
{
this_query_runs = 1;
queries_for_fuzzed_tables = fuzzer.getInsertQueriesForFuzzedTables(full_query);

View File

@ -518,7 +518,9 @@ try
SCOPE_EXIT({ cleanup(); });
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")),
toProgressOption(config().getString("progress-table", "default")));
initKeystrokeInterceptor();
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
/// try to load user defined executable functions, throw on error and die

View File

@ -158,6 +158,11 @@ namespace Setting
extern const SettingsSeconds send_timeout;
}
namespace MergeTreeSetting
{
extern const MergeTreeSettingsBool allow_remote_fs_zero_copy_replication;
}
}
namespace CurrentMetrics
@ -599,7 +604,7 @@ void sanityChecks(Server & server)
{
}
if (server.context()->getMergeTreeSettings().allow_remote_fs_zero_copy_replication)
if (server.context()->getMergeTreeSettings()[MergeTreeSetting::allow_remote_fs_zero_copy_replication])
{
server.context()->addWarningMessage("The setting 'allow_remote_fs_zero_copy_replication' is enabled for MergeTree tables."
" But the feature of 'zero-copy replication' is under development and is not ready for production."
@ -628,7 +633,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto condition_write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Checking startup query condition `{}`", condition);
executeQuery(condition_read_buffer, condition_write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(condition_read_buffer, condition_write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto result = condition_write_buffer.str();
@ -648,7 +655,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Executing query `{}`", query);
executeQuery(read_buffer, write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(read_buffer, write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
}
}
catch (...)
@ -1125,9 +1134,6 @@ try
/// We need to reload server settings because config could be updated via zookeeper.
server_settings.loadSettingsFromConfig(config());
/// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK
sanityChecks(*this);
#if defined(OS_LINUX)
std::string executable_path = getExecutablePath();
@ -2019,6 +2025,11 @@ try
if (!filesystem_caches_path.empty())
global_context->setFilesystemCachesPath(filesystem_caches_path);
/// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK
/// Additionally, making the check after the default profile is initialized.
/// It is important to initialize MergeTreeSettings after Settings, to support compatibility for MergeTreeSettings.
sanityChecks(*this);
/// Check sanity of MergeTreeSettings on server startup
{
size_t background_pool_tasks = global_context->getMergeMutateExecutor()->getMaxTasksCount();

View File

@ -22,8 +22,10 @@
#include <Backups/RestorerFromBackup.h>
#include <Core/Settings.h>
#include <base/defines.h>
#include <base/range.h>
#include <IO/Operators.h>
#include <Common/re2.h>
#include <Poco/AccessExpireCache.h>
#include <boost/algorithm/string/join.hpp>
#include <filesystem>
@ -133,7 +135,7 @@ public:
String{setting_name}, boost::algorithm::join(registered_prefixes, "' or '"));
}
else
BaseSettingsHelpers::throwSettingNotFound(setting_name);
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting '{}'", String{setting_name});
}
private:

View File

@ -1,18 +1,17 @@
#pragma once
#include <Access/AccessRights.h>
#include <Access/ContextAccessParams.h>
#include <Access/EnabledRowPolicies.h>
#include <Interpreters/ClientInfo.h>
#include <Access/QuotaUsage.h>
#include <Common/SettingsChanges.h>
#include <Core/UUID.h>
#include <base/scope_guard.h>
#include <boost/container/flat_set.hpp>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <Access/AccessRights.h>
#include <Access/ContextAccessParams.h>
#include <Access/EnabledRowPolicies.h>
#include <Access/QuotaUsage.h>
#include <Core/UUID.h>
#include <Interpreters/ClientInfo.h>
#include <base/scope_guard.h>
#include <Common/SettingsChanges.h>
namespace Poco { class Logger; }

View File

@ -1,9 +1,8 @@
#pragma once
#include <base/types.h>
#include <boost/container/flat_set.hpp>
#include <Access/Common/SSLCertificateSubjects.h>
#include <memory>
#include <Access/Common/SSLCertificateSubjects.h>
#include <base/types.h>
#include "config.h"

View File

@ -1,10 +1,9 @@
#pragma once
#include <Access/EnabledRoles.h>
#include <Poco/AccessExpireCache.h>
#include <boost/container/flat_set.hpp>
#include <map>
#include <mutex>
#include <Access/EnabledRoles.h>
#include <Poco/AccessExpireCache.h>
namespace DB

View File

@ -431,8 +431,8 @@ SettingsConstraints::Checker SettingsConstraints::getMergeTreeChecker(std::strin
auto full_name = settingFullName<MergeTreeSettings>(short_name);
auto it = constraints.find(resolveSettingNameWithCache(full_name));
if (it == constraints.end())
return Checker(MergeTreeSettings::Traits::resolveName); // Allowed
return Checker(it->second, MergeTreeSettings::Traits::resolveName);
return Checker(MergeTreeSettings::resolveName); // Allowed
return Checker(it->second, MergeTreeSettings::resolveName);
}
bool SettingsConstraints::Constraint::operator==(const Constraint & other) const

View File

@ -28,6 +28,7 @@
#include <cstring>
#include <filesystem>
#include <base/FnTraits.h>
#include <base/range.h>
namespace DB

View File

@ -298,12 +298,13 @@ public:
Field value = values[col_idx];
/// Compatibility with previous versions.
if (value.getType() == Field::Types::Decimal32)
WhichDataType value_type(values_types[col_idx]);
if (value_type.isDecimal32())
{
auto source = value.safeGet<DecimalField<Decimal32>>();
value = DecimalField<Decimal128>(source.getValue(), source.getScale());
}
else if (value.getType() == Field::Types::Decimal64)
else if (value_type.isDecimal64())
{
auto source = value.safeGet<DecimalField<Decimal64>>();
value = DecimalField<Decimal128>(source.getValue(), source.getScale());
@ -545,7 +546,28 @@ public:
}
}
bool keepKey(const Field & key) const { return keys_to_keep.contains(key); }
bool keepKey(const Field & key) const
{
if (keys_to_keep.contains(key))
return true;
// Determine whether the numerical value of the key can have both types (UInt or Int),
// and use the other type with the same numerical value for keepKey verification.
if (key.getType() == Field::Types::UInt64)
{
const auto & value = key.safeGet<const UInt64 &>();
if (value <= std::numeric_limits<Int64>::max())
return keys_to_keep.contains(Field(Int64(value)));
}
else if (key.getType() == Field::Types::Int64)
{
const auto & value = key.safeGet<const Int64 &>();
if (value >= 0)
return keys_to_keep.contains(Field(UInt64(value)));
}
return false;
}
};

View File

@ -336,7 +336,7 @@ ASTPtr IQueryTreeNode::toAST(const ConvertToASTOptions & options) const
{
auto converted_node = toASTImpl(options);
if (auto * ast_with_alias = dynamic_cast<ASTWithAlias *>(converted_node.get()))
if (auto * /*ast_with_alias*/ _ = dynamic_cast<ASTWithAlias *>(converted_node.get()))
converted_node->setAlias(alias);
return converted_node;

View File

@ -285,7 +285,7 @@ public:
return;
}
if (const auto * join_node = node->as<JoinNode>())
if (const auto * /*join_node*/ _ = node->as<JoinNode>())
{
can_wrap_result_columns_with_nullable |= getContext()->getSettingsRef()[Setting::join_use_nulls];
return;

View File

@ -77,7 +77,7 @@ public:
for (size_t i = 0; i < function->getArguments().getNodes().size(); i++)
{
if (const auto * func = function->getArguments().getNodes()[i]->as<FunctionNode>())
if (const auto * /*func*/ _ = function->getArguments().getNodes()[i]->as<FunctionNode>())
{
func_id = i;
break;

View File

@ -676,7 +676,7 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
result = std::move(query_node);
}
else if (const auto * select_with_union_query = expression->as<ASTSelectWithUnionQuery>())
else if (const auto * /*select_with_union_query*/ _ = expression->as<ASTSelectWithUnionQuery>())
{
auto query_node = buildSelectWithUnionExpression(expression, false /*is_subquery*/, {} /*cte_name*/, context);
result = std::move(query_node);

View File

@ -7,6 +7,8 @@
#include <Functions/FunctionHelpers.h>
#include <Storages/IStorage.h>
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/JoinUtils.h>
@ -417,9 +419,16 @@ QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalo
bool is_temporary_table = storage_id.getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE;
StoragePtr storage;
TableLockHolder storage_lock;
if (is_temporary_table)
storage = DatabaseCatalog::instance().getTable(storage_id, context);
else if (auto refresh_task = context->getRefreshSet().tryGetTaskForInnerTable(storage_id))
{
/// If table is the target of a refreshable materialized view, it needs additional
/// synchronization to make sure we see all of the data (e.g. if refresh happened on another replica).
std::tie(storage, storage_lock) = refresh_task->getAndLockTargetTable(storage_id, context);
}
else
storage = DatabaseCatalog::instance().tryGetTable(storage_id, context);
@ -434,7 +443,8 @@ QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalo
if (!storage)
return {};
auto storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
if (!storage_lock)
storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context);
auto result = std::make_shared<TableNode>(std::move(storage), std::move(storage_lock), std::move(storage_snapshot));
if (is_temporary_table)

View File

@ -1951,7 +1951,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
{
bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get());
if (auto * array_join_node = table_expression->as<ArrayJoinNode>())
if (auto * /*array_join_node*/ _ = table_expression->as<ArrayJoinNode>())
{
if (table_expressions_column_nodes_with_names_stack.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
@ -4045,9 +4045,10 @@ ProjectionNames QueryAnalyzer::resolveSortNodeList(QueryTreeNodePtr & sort_node_
const auto * constant_node = sort_node.getFillTo()->as<ConstantNode>();
if (!constant_node || !isColumnedAsNumber(constant_node->getResultType()))
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
throw Exception(
ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"Sort FILL TO expression must be constant with numeric type. Actual {}. In scope {}",
sort_node.getFillFrom()->formatASTForErrorMessage(),
sort_node.getFillTo()->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
size_t fill_to_expression_projection_names_size = fill_to_expression_projection_names.size();

View File

@ -45,7 +45,7 @@ public:
bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child)
{
if (auto * lambda_node = child->as<LambdaNode>())
if (auto * /*lambda_node*/ _ = child->as<LambdaNode>())
{
updateAliasesIfNeeded(child, true /*is_lambda_node*/);
return false;

View File

@ -171,6 +171,8 @@ void ClientApplicationBase::init(int argc, char ** argv)
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("progress", po::value<ProgressOption>()->implicit_value(ProgressOption::TTY, "tty")->default_value(ProgressOption::DEFAULT, "default"), "Print progress of queries execution - to TTY: tty|on|1|true|yes; to STDERR non-interactive mode: err; OFF: off|0|false|no; DEFAULT - interactive to TTY, non-interactive is off")
("progress-table", po::value<ProgressOption>()->implicit_value(ProgressOption::TTY, "tty")->default_value(ProgressOption::DEFAULT, "default"), "Print a progress table with changing metrics during query execution - to TTY: tty|on|1|true|yes; to STDERR non-interactive mode: err; OFF: off|0|false|no; DEFAULT - interactive to TTY, non-interactive is off.")
("enable-progress-table-toggle", po::value<bool>()->default_value(true), "Enable toggling of the progress table by pressing the control key (Space). Only applicable in interactive mode with the progress table enabled.")
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("wait_for_suggestions_to_load", "Load suggestion data synchonously.")
@ -316,6 +318,26 @@ void ClientApplicationBase::init(int argc, char ** argv)
break;
}
}
if (options.count("progress-table"))
{
switch (options["progress-table"].as<ProgressOption>())
{
case DEFAULT:
config().setString("progress-table", "default");
break;
case OFF:
config().setString("progress-table", "off");
break;
case TTY:
config().setString("progress-table", "tty");
break;
case ERR:
config().setString("progress-table", "err");
break;
}
}
if (options.count("enable-progress-table-toggle"))
getClientConfiguration().setBool("enable-progress-table-toggle", options["enable-progress-table-toggle"].as<bool>());
if (options.count("echo"))
getClientConfiguration().setBool("echo", true);
if (options.count("disable_suggestion"))

View File

@ -1,8 +1,9 @@
#include <Client/ClientBase.h>
#include <Client/LineReader.h>
#include <Client/ClientBaseHelpers.h>
#include <Client/TestHint.h>
#include <Client/InternalTextLogs.h>
#include <Client/LineReader.h>
#include <Client/TerminalKeystrokeInterceptor.h>
#include <Client/TestHint.h>
#include <Client/TestTags.h>
#include <base/safeExit.h>
@ -288,6 +289,7 @@ ClientBase::ClientBase(
: std_in(in_fd_)
, std_out(out_fd_)
, progress_indication(output_stream_, in_fd_, err_fd_)
, progress_table(output_stream_, in_fd_, err_fd_)
, in_fd(in_fd_)
, out_fd(out_fd_)
, err_fd(err_fd_)
@ -438,6 +440,8 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_table.clearTableOutput(*tty_buf);
try
{
@ -453,13 +457,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
/// Received data block is immediately displayed to the user.
output_format->flush();
/// Restore progress bar after data block.
/// Restore progress bar and progress table after data block.
if (need_render_progress && tty_buf)
{
if (select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
progress_indication.writeProgress(*tty_buf);
}
if (need_render_progress_table && tty_buf)
{
if (!need_render_progress && select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, show_progress_table.load(), toggle_enabled);
}
}
@ -468,6 +479,8 @@ void ClientBase::onLogData(Block & block)
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -796,16 +809,23 @@ void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
}
}
void ClientBase::initTTYBuffer(ProgressOption progress)
void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption progress_table_option)
{
if (tty_buf)
return;
if (progress == ProgressOption::OFF || (!is_interactive && progress == ProgressOption::DEFAULT))
{
need_render_progress = false;
return;
}
if (progress_option == ProgressOption::OFF || (!is_interactive && progress_option == ProgressOption::DEFAULT))
need_render_progress = false;
if (progress_table_option == ProgressOption::OFF || (!is_interactive && progress_table_option == ProgressOption::DEFAULT))
need_render_progress_table = false;
if (!need_render_progress && !need_render_progress_table)
return;
/// If need_render_progress and need_render_progress_table are enabled,
/// use ProgressOption that was set for the progress bar for progress table as well.
ProgressOption progress = progress_option ? progress_option : progress_table_option;
static constexpr auto tty_file_name = "/dev/tty";
@ -851,7 +871,20 @@ void ClientBase::initTTYBuffer(ProgressOption progress)
tty_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO, buf_size);
}
else
{
need_render_progress = false;
need_render_progress_table = false;
}
}
void ClientBase::initKeystrokeInterceptor()
{
if (is_interactive && need_render_progress_table && getClientConfiguration().getBool("enable-progress-table-toggle", true))
{
keystroke_interceptor = std::make_unique<TerminalKeystrokeInterceptor>(in_fd, error_stream);
keystroke_interceptor->registerCallback(' ', [this]() { show_progress_table = !show_progress_table; });
}
}
void ClientBase::updateSuggest(const ASTPtr & ast)
@ -1115,6 +1148,34 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
std::exception_ptr local_format_error;
if (keystroke_interceptor)
{
try
{
keystroke_interceptor->startIntercept();
}
catch (const DB::Exception &)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
SCOPE_EXIT({
if (keystroke_interceptor)
{
try
{
keystroke_interceptor->stopIntercept();
}
catch (...)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
});
while (true)
{
Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE);
@ -1266,6 +1327,8 @@ void ClientBase::onEndOfStream()
{
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
if (output_format)
{
@ -1344,9 +1407,15 @@ void ClientBase::onProfileEvents(Block & block)
thread_times[host_name].peak_memory_usage = value;
}
progress_indication.updateThreadEventData(thread_times);
progress_table.updateTable(block);
if (need_render_progress && tty_buf)
progress_indication.writeProgress(*tty_buf);
if (need_render_progress_table && tty_buf)
{
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, show_progress_table.load(), toggle_enabled);
}
if (profile_events.print)
{
@ -1357,6 +1426,8 @@ void ClientBase::onProfileEvents(Block & block)
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
@ -1838,6 +1909,8 @@ void ClientBase::cancelQuery()
connection->sendCancel();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
if (is_interactive)
output_stream << "Cancelling query." << std::endl;
@ -1904,6 +1977,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
processed_rows = 0;
written_first_block = false;
progress_indication.resetProgress();
progress_table.resetTable();
profile_events.watch.restart();
{
@ -2030,6 +2104,8 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();
@ -2043,6 +2119,8 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
output_stream << processed_rows << " row" << (processed_rows == 1 ? "" : "s") << " in set. ";
output_stream << "Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
progress_indication.writeFinalProgress();
if (need_render_progress_table && show_progress_table)
progress_table.writeFinalTable();
output_stream << std::endl << std::endl;
}
else
@ -2498,7 +2576,7 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create)
|| ast_create.storage->engine->name.find("MergeTree") == std::string::npos)
return false;
auto all_changed = cmd_merge_tree_settings.allChanged();
auto all_changed = cmd_merge_tree_settings.changes();
if (all_changed.begin() == all_changed.end())
return false;
@ -2512,11 +2590,11 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create)
auto & storage_settings = *ast_create.storage->settings;
bool added_new_setting = false;
for (const auto & setting : all_changed)
for (const auto & change : all_changed)
{
if (!storage_settings.changes.tryGet(setting.getName()))
if (!storage_settings.changes.tryGet(change.name))
{
storage_settings.changes.emplace_back(setting.getName(), setting.getValue());
storage_settings.changes.emplace_back(change.name, change.value);
added_new_setting = true;
}
}

View File

@ -1,21 +1,22 @@
#pragma once
#include <Client/ProgressTable.h>
#include <Client/Suggest.h>
#include <Common/QueryFuzzer.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>
#include <Common/ProgressIndication.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Core/ExternalTable.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Poco/Util/Application.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>
#include <Common/ProgressIndication.h>
#include <Common/QueryFuzzer.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/SelectQueryInfo.h>
@ -68,6 +69,7 @@ ProgressOption toProgressOption(std::string progress);
std::istream& operator>> (std::istream & in, ProgressOption & progress);
class InternalTextLogs;
class TerminalKeystrokeInterceptor;
class WriteBufferFromFileDescriptor;
/**
@ -245,7 +247,8 @@ protected:
void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress);
void initTTYBuffer(ProgressOption progress_option, ProgressOption progress_table_option);
void initKeystrokeInterceptor();
/// Should be one of the first, to be destroyed the last,
/// since other members can use them.
@ -255,6 +258,8 @@ protected:
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
ContextMutablePtr client_context;
std::unique_ptr<TerminalKeystrokeInterceptor> keystroke_interceptor;
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool delayed_interactive = false;
@ -332,7 +337,10 @@ protected:
String server_display_name;
ProgressIndication progress_indication;
ProgressTable progress_table;
bool need_render_progress = true;
bool need_render_progress_table = true;
std::atomic_bool show_progress_table = false;
bool need_render_profile_events = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.

View File

@ -1,5 +1,4 @@
#include <Client/ClientApplicationBase.h>
#include <Core/BaseSettingsProgramOptions.h>
namespace DB
{
@ -19,17 +18,6 @@ namespace ErrorCodes
namespace
{
/// Define transparent hash to we can use
/// std::string_view with the containers
struct TransparentStringHash
{
using is_transparent = void;
size_t operator()(std::string_view txt) const
{
return std::hash<std::string_view>{}(txt);
}
};
/*
* This functor is used to parse command line arguments and replace dashes with underscores,
* allowing options to be specified using either dashes or underscores.
@ -89,41 +77,8 @@ void ClientApplicationBase::parseAndCheckOptions(OptionsDescription & options_de
if (allow_merge_tree_settings)
{
/// Add merge tree settings manually, because names of some settings
/// may clash. Query settings have higher priority and we just
/// skip ambiguous merge tree settings.
auto & main_options = options_description.main_description.value();
std::unordered_set<std::string, TransparentStringHash, std::equal_to<>> main_option_names;
for (const auto & option : main_options.options())
main_option_names.insert(option->long_name());
for (const auto & setting : cmd_merge_tree_settings.all())
{
const auto add_setting = [&](const std::string_view name)
{
if (auto it = main_option_names.find(name); it != main_option_names.end())
return;
if (allow_repeated_settings)
addProgramOptionAsMultitoken(cmd_merge_tree_settings, main_options, name, setting);
else
addProgramOption(cmd_merge_tree_settings, main_options, name, setting);
};
const auto & setting_name = setting.getName();
add_setting(setting_name);
const auto & settings_to_aliases = MergeTreeSettings::Traits::settingsToAliases();
if (auto it = settings_to_aliases.find(setting_name); it != settings_to_aliases.end())
{
for (const auto alias : it->second)
{
add_setting(alias);
}
}
}
cmd_merge_tree_settings.addToProgramOptionsIfNotPresent(main_options, allow_repeated_settings);
}
/// Parse main commandline options.

View File

@ -0,0 +1,474 @@
#include "ProgressTable.h"
#include "Common/AllocatorWithMemoryTracking.h"
#include "Common/ProfileEvents.h"
#include "base/defines.h"
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Interpreters/ProfileEventsExt.h>
#include <base/terminalColors.h>
#include <Common/TerminalSize.h>
#include <Common/formatReadable.h>
#include <format>
#include <numeric>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
constexpr UInt64 THREAD_GROUP_ID = 0;
constexpr std::string_view CLEAR_TO_END_OF_LINE = "\033[K";
constexpr std::string_view CLEAR_TO_END_OF_SCREEN = "\033[0J";
constexpr std::string_view RESET_COLOR = "\033[0m";
constexpr std::string_view HIDE_CURSOR = "\033[?25l";
constexpr std::string_view SHOW_CURSOR = "\033[?25h";
std::string moveUpNLines(size_t N)
{
return std::format("\033[{}A", N);
}
std::string formatReadableValue(ProfileEvents::ValueType value_type, double value)
{
switch (value_type)
{
case ProfileEvents::ValueType::Number:
return formatReadableQuantity(value, /*precision*/ std::floor(value) == value && fabs(value) < 1000 ? 0 : 2);
case ProfileEvents::ValueType::Bytes:
return formatReadableSizeWithDecimalSuffix(value);
case ProfileEvents::ValueType::Nanoseconds:
return formatReadableTime(value);
case ProfileEvents::ValueType::Microseconds:
return formatReadableTime(value * 1e3);
case ProfileEvents::ValueType::Milliseconds:
return formatReadableTime(value * 1e6);
}
}
const std::unordered_map<std::string_view, ProfileEvents::Event> & getEventNameToEvent()
{
/// TODO: MemoryTracker::USAGE_EVENT_NAME and PEAK_USAGE_EVENT_NAME
static std::unordered_map<std::string_view, ProfileEvents::Event> event_name_to_event;
if (!event_name_to_event.empty())
return event_name_to_event;
for (ProfileEvents::Event event = ProfileEvents::Event(0); event < ProfileEvents::end(); ++event)
{
event_name_to_event.emplace(ProfileEvents::getName(event), event);
}
return event_name_to_event;
}
std::string_view setColorForProgress(double progress, double max_progress)
{
constexpr std::array<std::string_view, 5> colors = {
"\033[38;5;236m", /// Dark Grey
"\033[38;5;250m", /// Light Grey
"\033[38;5;34m", /// Green
"\033[38;5;226m", /// Yellow
"\033[1;33m", /// Bold
};
constexpr std::array<double, 4> fractions = {
0.05,
0.20,
0.80,
0.95,
};
if (max_progress == 0)
return colors.front();
auto fraction = progress / max_progress;
auto dist = std::upper_bound(fractions.begin(), fractions.end(), fraction) - fractions.begin();
return colors[dist];
}
std::string_view setColorForBytesBasedMetricsProgress(double progress)
{
constexpr std::array<std::string_view, 7> colors = {
"\033[38;5;236m", /// Dark Grey
"\033[38;5;250m", /// Light Grey
"\033[38;5;34m", /// Green
"\033[38;5;226m", /// Yellow
"\033[38;5;208m", /// Orange
"\033[1;33m", /// Bold
"\033[38;5;160m", /// Red: corresponds to >= 1T/s. Not a practical scenario.
};
/// Bytes.
constexpr std::array<uint64_t, 6> thresholds = {
1LL << 20,
100LL << 20,
1'000LL << 20,
10'000LL << 20,
100'000LL << 20,
1'000'000LL << 20,
};
auto dist = std::upper_bound(thresholds.begin(), thresholds.end(), progress) - thresholds.begin();
return colors[dist];
}
std::string_view setColorForTimeBasedMetricsProgress(ProfileEvents::ValueType value_type, double progress)
{
/// Time units in a second.
auto units = [](ProfileEvents::ValueType t) -> double
{
switch (t)
{
case ProfileEvents::ValueType::Milliseconds:
return 1e3;
case ProfileEvents::ValueType::Microseconds:
return 1e6;
case ProfileEvents::ValueType::Nanoseconds:
return 1e9;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong value type, expecting time units");
}
}(value_type);
constexpr std::array<std::string_view, 5> colors = {
"\033[38;5;236m", /// Dark Grey
"\033[38;5;250m", /// Light Grey
"\033[38;5;34m", /// Green
"\033[38;5;226m", /// Yellow
"\033[1;33m" /// Bold
};
const std::array<double, 4> thresholds = {0.001 * units, 0.01 * units, 0.1 * units, 1.0 * units};
auto dist = std::upper_bound(thresholds.begin(), thresholds.end(), progress) - thresholds.begin();
return colors[dist];
}
std::string_view setColorForStaleMetrics()
{
return "\033[38;5;236m"; /// Dark Grey
}
std::string_view setColorForDocumentation()
{
return "\033[38;5;236m"; /// Dark Grey
}
template <typename Out>
void writeWithWidth(Out & out, std::string_view s, size_t width)
{
if (s.size() >= width)
out << s << " ";
else
out << s << std::string(width - s.size(), ' ');
}
template <typename Out>
void writeWithWidthStrict(Out & out, std::string_view s, size_t width)
{
chassert(width != 0);
if (s.size() > width)
out << s.substr(0, width - 1) << "";
else
out << s;
}
}
void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled)
{
std::lock_guard lock{mutex};
if (!show_table)
{
if (written_first_block)
message << CLEAR_TO_END_OF_SCREEN;
if (toggle_enabled)
{
message << HIDE_CURSOR;
message << "\n";
message << "Press the space key to toggle the display of the progress table.";
message << moveUpNLines(1);
message.next();
}
return;
}
const auto & event_name_to_event = getEventNameToEvent();
size_t terminal_width = getTerminalWidth(in_fd, err_fd);
if (terminal_width < column_event_name_width + COLUMN_VALUE_WIDTH + COLUMN_PROGRESS_WIDTH)
return;
if (metrics.empty())
return;
message << HIDE_CURSOR;
message << "\n";
writeWithWidth(message, COLUMN_EVENT_NAME, column_event_name_width);
writeWithWidth(message, COLUMN_VALUE, COLUMN_VALUE_WIDTH);
writeWithWidth(message, COLUMN_PROGRESS, COLUMN_PROGRESS_WIDTH);
writeWithWidth(message, COLUMN_DOCUMENTATION_NAME, COLUMN_DOCUMENTATION_WIDTH);
message << CLEAR_TO_END_OF_LINE;
double elapsed_sec = watch.elapsedSeconds();
for (auto & [name, per_host_info] : metrics)
{
message << "\n";
if (per_host_info.isStale(elapsed_sec))
message << setColorForStaleMetrics();
writeWithWidth(message, name, column_event_name_width);
auto value = per_host_info.getSummaryValue();
auto value_type = getValueType(event_name_to_event.at(name));
writeWithWidth(message, formatReadableValue(value_type, value), COLUMN_VALUE_WIDTH);
/// Get the maximum progress before it is updated in getSummaryProgress.
auto max_progress = per_host_info.getMaxProgress();
auto progress = per_host_info.getSummaryProgress(elapsed_sec);
switch (value_type)
{
case ProfileEvents::ValueType::Number:
message << setColorForProgress(progress, max_progress);
break;
case ProfileEvents::ValueType::Bytes:
message << setColorForBytesBasedMetricsProgress(progress);
break;
case ProfileEvents::ValueType::Milliseconds:
[[fallthrough]];
case ProfileEvents::ValueType::Microseconds:
[[fallthrough]];
case ProfileEvents::ValueType::Nanoseconds:
message << setColorForTimeBasedMetricsProgress(value_type, progress);
break;
}
writeWithWidth(message, formatReadableValue(value_type, progress) + "/s", COLUMN_PROGRESS_WIDTH);
message << setColorForDocumentation();
const auto * doc = getDocumentation(event_name_to_event.at(name));
writeWithWidthStrict(message, doc, COLUMN_DOCUMENTATION_WIDTH);
message << RESET_COLOR;
message << CLEAR_TO_END_OF_LINE;
}
message << moveUpNLines(tableSize());
message.next();
}
void ProgressTable::writeFinalTable()
{
std::lock_guard lock{mutex};
const auto & event_name_to_event = getEventNameToEvent();
size_t terminal_width = getTerminalWidth(in_fd, err_fd);
if (terminal_width < column_event_name_width + COLUMN_VALUE_WIDTH)
return;
if (metrics.empty())
return;
output_stream << "\n";
writeWithWidth(output_stream, COLUMN_EVENT_NAME, column_event_name_width);
writeWithWidth(output_stream, COLUMN_VALUE, COLUMN_VALUE_WIDTH);
for (auto & [name, per_host_info] : metrics)
{
output_stream << "\n";
writeWithWidth(output_stream, name, column_event_name_width);
auto value = per_host_info.getSummaryValue();
auto value_type = getValueType(event_name_to_event.at(name));
writeWithWidth(output_stream, formatReadableValue(value_type, value), COLUMN_VALUE_WIDTH);
}
}
void ProgressTable::updateTable(const Block & block)
{
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnInt64 &>(*block.getByName("value").column).getData();
const auto & array_type = typeid_cast<const ColumnInt8 &>(*block.getByName("type").column).getData();
const double time_now = watch.elapsedSeconds();
size_t max_event_name_width = COLUMN_EVENT_NAME.size();
std::lock_guard lock{mutex};
const auto & event_name_to_event = getEventNameToEvent();
for (size_t row_num = 0, rows = block.rows(); row_num < rows; ++row_num)
{
auto thread_id = array_thread_id[row_num];
/// In ProfileEvents packets thread id 0 specifies common profiling information
/// for all threads executing current query on specific host. So instead of summing per thread
/// consumption it's enough to look for data with thread id 0.
if (thread_id != THREAD_GROUP_ID)
continue;
auto value = array_values[row_num];
auto name = names.getDataAt(row_num).toString();
auto host_name = host_names.getDataAt(row_num).toString();
auto type = static_cast<ProfileEvents::Type>(array_type[row_num]);
/// Got unexpected event name.
if (!event_name_to_event.contains(name))
continue;
/// Store non-zero values.
if (value == 0)
continue;
auto it = metrics.find(name);
/// If the table has already been written, then do not add new metrics to avoid jitter.
if (it == metrics.end() && written_first_block)
continue;
if (!written_first_block)
it = metrics.try_emplace(name).first;
it->second.updateHostValue(host_name, type, value, time_now);
max_event_name_width = std::max(max_event_name_width, name.size());
}
if (!written_first_block)
column_event_name_width = max_event_name_width + 1;
written_first_block = true;
}
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message)
{
message << CLEAR_TO_END_OF_SCREEN;
message << SHOW_CURSOR;
message.next();
}
void ProgressTable::resetTable()
{
std::lock_guard lock{mutex};
watch.restart();
metrics.clear();
written_first_block = false;
}
size_t ProgressTable::tableSize() const
{
/// Number of lines + header.
return metrics.empty() ? 0 : metrics.size() + 1;
}
ProgressTable::MetricInfo::MetricInfo(ProfileEvents::Type t) : type(t)
{
}
void ProgressTable::MetricInfo::updateValue(Int64 new_value, double new_time)
{
/// If the value has not been updated for a long time,
/// reset the time in snapshots to one second ago.
if (new_time - new_snapshot.time >= 0.5 || new_snapshot.time == 0)
{
prev_shapshot = {new_snapshot.value, new_time - 1.0};
cur_shapshot = {new_snapshot.value, new_time - 1.0};
}
switch (type)
{
case ProfileEvents::Type::INCREMENT:
new_snapshot.value = new_snapshot.value + new_value;
break;
case ProfileEvents::Type::GAUGE:
new_snapshot.value = new_value;
break;
}
new_snapshot.time = new_time;
if (new_snapshot.time - cur_shapshot.time >= 0.5)
prev_shapshot = std::exchange(cur_shapshot, new_snapshot);
update_time = new_time;
}
bool ProgressTable::MetricInfo::isStale(double now) const
{
return update_time != 0 && now - update_time >= 5.0;
}
double ProgressTable::MetricInfo::calculateProgress(double time_now) const
{
/// If the value has not been updated for a long time, the progress is 0.
if (time_now - new_snapshot.time >= 0.5)
return 0;
return (cur_shapshot.value - prev_shapshot.value) / (cur_shapshot.time - prev_shapshot.time);
}
double ProgressTable::MetricInfo::getValue() const
{
return new_snapshot.value;
}
void ProgressTable::MetricInfoPerHost::updateHostValue(const HostName & host, ProfileEvents::Type type, Int64 new_value, double new_time)
{
auto it = host_to_metric.find(host);
if (it == host_to_metric.end())
it = host_to_metric.emplace(host, type).first;
it->second.updateValue(new_value, new_time);
}
double ProgressTable::MetricInfoPerHost::getSummaryValue()
{
return std::accumulate(
host_to_metric.cbegin(),
host_to_metric.cend(),
0.0,
[](double acc, const auto & host_data)
{
const MetricInfo & info = host_data.second;
return acc + info.getValue();
});
}
double ProgressTable::MetricInfoPerHost::getSummaryProgress(double time_now)
{
auto progress = std::accumulate(
host_to_metric.cbegin(),
host_to_metric.cend(),
0.0,
[time_now](double acc, const auto & host_data)
{
const MetricInfo & info = host_data.second;
return acc + info.calculateProgress(time_now);
});
max_progress = std::max(max_progress, progress);
return progress;
}
double ProgressTable::MetricInfoPerHost::getMaxProgress() const
{
return max_progress;
}
bool ProgressTable::MetricInfoPerHost::isStale(double now) const
{
return std::all_of(host_to_metric.cbegin(), host_to_metric.cend(), [&now](const auto & p) { return p.second.isStale(now); });
}
}

120
src/Client/ProgressTable.h Normal file
View File

@ -0,0 +1,120 @@
#pragma once
#include <Interpreters/ProfileEventsExt.h>
#include <base/types.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <map>
#include <mutex>
#include <ostream>
#include <string_view>
#include <unordered_map>
#include <unistd.h>
namespace DB
{
class WriteBufferFromFileDescriptor;
class Block;
class ProgressTable
{
public:
explicit ProgressTable(std::ostream & output_stream_, int in_fd_ = STDIN_FILENO, int err_fd_ = STDERR_FILENO)
: output_stream(output_stream_), in_fd(in_fd_), err_fd(err_fd_)
{
}
/// Write progress table with metrics.
void writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled);
void clearTableOutput(WriteBufferFromFileDescriptor & message);
void writeFinalTable();
/// Update the metric values. They can be updated from:
/// onProfileEvents in clickhouse-client;
void updateTable(const Block & block);
/// Reset progress table values.
void resetTable();
private:
class MetricInfo
{
public:
explicit MetricInfo(ProfileEvents::Type t);
void updateValue(Int64 new_value, double new_time);
double calculateProgress(double time_now) const;
double getValue() const;
bool isStale(double now) const;
private:
const ProfileEvents::Type type;
struct Snapshot
{
Int64 value = 0;
double time = 0;
};
/// The previous and current snapshots are used by `calculateProgress`.
/// They contain information that is outdated by about a second.
/// The new snapshot is used by `updateValue` and `getValue`.
/// We don't use a new snapshot in `calculateProgress` because the time elapsed since
/// the previous update may be very small, causing jitter.
Snapshot prev_shapshot;
Snapshot cur_shapshot;
Snapshot new_snapshot;
double update_time = 0.0;
};
class MetricInfoPerHost
{
public:
using HostName = String;
void updateHostValue(const HostName & host, ProfileEvents::Type type, Int64 new_value, double new_time);
double getSummaryValue();
double getSummaryProgress(double time_now);
double getMaxProgress() const;
bool isStale(double now) const;
private:
std::unordered_map<HostName, MetricInfo> host_to_metric;
double max_progress = 0;
};
size_t tableSize() const;
using MetricName = String;
/// The server periodically sends Block with profile events.
/// This information is stored here.
std::map<MetricName, MetricInfoPerHost> metrics;
/// It is possible concurrent access to the metrics.
std::mutex mutex;
/// Track query execution time on client.
Stopwatch watch;
bool written_first_block = false;
size_t column_event_name_width = 20;
static constexpr std::string_view COLUMN_EVENT_NAME = "Event name";
static constexpr std::string_view COLUMN_VALUE = "Value";
static constexpr std::string_view COLUMN_PROGRESS = "Progress";
static constexpr std::string_view COLUMN_DOCUMENTATION_NAME = "Documentation";
static constexpr size_t COLUMN_VALUE_WIDTH = 20;
static constexpr size_t COLUMN_PROGRESS_WIDTH = 20;
static constexpr size_t COLUMN_DOCUMENTATION_WIDTH = 100;
std::ostream & output_stream;
int in_fd;
int err_fd;
};
}

View File

@ -0,0 +1,118 @@
#include <chrono>
#include <memory>
#include <Client/TerminalKeystrokeInterceptor.h>
#include <Common/Exception.h>
#include <ostream>
#include <termios.h>
#include <unistd.h>
#include <base/defines.h>
namespace DB::ErrorCodes
{
extern const int SYSTEM_ERROR;
}
namespace DB
{
TerminalKeystrokeInterceptor::TerminalKeystrokeInterceptor(int fd_, std::ostream & error_stream_) : fd(fd_), error_stream(error_stream_)
{
}
TerminalKeystrokeInterceptor::~TerminalKeystrokeInterceptor()
{
try
{
stopIntercept();
}
catch (...)
{
error_stream << getCurrentExceptionMessage(false);
}
}
void TerminalKeystrokeInterceptor::registerCallback(char key, TerminalKeystrokeInterceptor::Callback cb)
{
callbacks.emplace(key, cb);
}
void TerminalKeystrokeInterceptor::startIntercept()
{
std::lock_guard<std::mutex> lock(mutex);
if (intercept_thread && intercept_thread->joinable())
return;
chassert(!orig_termios);
stop_requested = false;
/// Save terminal state.
orig_termios = std::make_unique<struct termios>();
if (tcgetattr(fd, orig_termios.get()))
throw DB::ErrnoException(
DB::ErrorCodes::SYSTEM_ERROR, "Cannot get the state of the terminal referred to by file descriptor '{}'", fd);
/// Set terminal to the raw terminal mode.
struct termios raw = *orig_termios;
raw.c_lflag &= ~(ECHO | ICANON);
raw.c_cc[VMIN] = 0;
raw.c_cc[VTIME] = 1;
if (tcsetattr(fd, TCSAFLUSH, &raw))
throw DB::ErrnoException(
DB::ErrorCodes::SYSTEM_ERROR, "Cannot set terminal to the raw mode for the terminal referred to by file descriptor '{}'", fd);
intercept_thread = std::make_unique<std::thread>(&TerminalKeystrokeInterceptor::run, this, callbacks);
}
void TerminalKeystrokeInterceptor::stopIntercept()
{
stop_requested = true;
std::lock_guard<std::mutex> lock(mutex);
if (intercept_thread && intercept_thread->joinable())
{
intercept_thread->join();
intercept_thread.reset();
}
/// Set to the original (canonical) terminal mode.
if (orig_termios)
{
if (tcsetattr(fd, TCSAFLUSH, orig_termios.get()))
throw DB::ErrnoException(
DB::ErrorCodes::SYSTEM_ERROR,
"Cannot set terminal to the original (canonical) mode for the terminal referred to by file descriptor '{}'",
fd);
orig_termios.reset();
}
}
void TerminalKeystrokeInterceptor::run(TerminalKeystrokeInterceptor::CallbackMap map)
{
while (!stop_requested)
{
runImpl(map);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
void TerminalKeystrokeInterceptor::runImpl(const DB::TerminalKeystrokeInterceptor::CallbackMap & map) const
{
char ch;
if (read(fd, &ch, 1) > 0)
{
auto it = map.find(ch);
if (it != map.end())
{
auto fn = it->second;
fn();
}
}
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <thread>
#include <unordered_map>
struct termios;
namespace DB
{
class TerminalKeystrokeInterceptor
{
using Callback = std::function<void()>;
using CallbackMap = std::unordered_map<char, Callback>;
public:
explicit TerminalKeystrokeInterceptor(int fd_, std::ostream & error_stream_);
~TerminalKeystrokeInterceptor();
void registerCallback(char key, Callback cb);
void startIntercept();
void stopIntercept();
private:
void run(CallbackMap);
void runImpl(const CallbackMap &) const;
const int fd;
std::ostream & error_stream;
std::mutex mutex;
CallbackMap callbacks;
std::unique_ptr<std::thread> intercept_thread;
std::unique_ptr<struct termios> orig_termios;
std::atomic_bool stop_requested = false;
};
}

View File

@ -54,7 +54,7 @@ AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
init.metric_threads,
init.metric_active_threads,
init.metric_scheduled_threads,
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
/* max_threads = */ ThreadPool::MAX_THEORETICAL_THREAD_COUNT, // Unlimited number of threads, we do worker management ourselves
/* max_free_threads = */ 0, // We do not require free threads
/* queue_size = */ 0)) // Unlimited queue to avoid blocking during worker spawning
{}

View File

@ -40,6 +40,7 @@ struct CalendarTimeInterval
/// Add this interval to the timestamp. First months, then seconds.
/// Gets weird near month boundaries: October 31 + 1 month = December 1.
/// The returned timestamp is always 28-31 days greater than t.
std::chrono::sys_seconds advance(std::chrono::system_clock::time_point t) const;
/// Rounds the timestamp down to the nearest timestamp "aligned" with this interval.

File diff suppressed because it is too large Load Diff

View File

@ -149,6 +149,15 @@ namespace ProfileEvents
static const Event num_counters;
};
enum class ValueType : uint8_t
{
Number,
Bytes,
Milliseconds,
Microseconds,
Nanoseconds,
};
/// Increment a counter for event. Thread-safe.
void increment(Event event, Count amount = 1);
@ -165,6 +174,9 @@ namespace ProfileEvents
/// Get description of event by identifier. Returns statically allocated string.
const char * getDocumentation(Event event);
/// Get value type of event by identifier. Returns enum value.
ValueType getValueType(Event event);
/// Get index just after last event identifier.
Event end();

View File

@ -1005,7 +1005,7 @@ void QueryFuzzer::fuzzExpressionList(ASTExpressionList & expr_list)
{
for (auto & child : expr_list.children)
{
if (auto * literal = typeid_cast<ASTLiteral *>(child.get()))
if (auto * /*literal*/ _ = typeid_cast<ASTLiteral *>(child.get()))
{
if (fuzz_rand() % 13 == 0)
child = fuzzLiteralUnderExpressionList(child);

View File

@ -9,10 +9,9 @@
#include <Core/Field.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/NullsAction.h>
#include <Common/randomSeed.h>
#include "Parsers/IAST_fwd.h"
namespace DB

View File

@ -52,7 +52,7 @@ public:
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<FairPolicy *>(other))
if (auto * _ = dynamic_cast<FairPolicy *>(other))
return true;
return false;
}

View File

@ -34,7 +34,7 @@ public:
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<FifoQueue *>(other))
if (auto * _ = dynamic_cast<FifoQueue *>(other))
return true;
return false;
}

View File

@ -43,7 +43,7 @@ public:
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<PriorityPolicy *>(other))
if (auto * _ = dynamic_cast<PriorityPolicy *>(other))
return true;
return false;
}

View File

@ -99,7 +99,7 @@ public:
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<SchedulerRoot *>(other))
if (auto * _ = dynamic_cast<SchedulerRoot *>(other))
return true;
return false;
}

97
src/Common/StopToken.cpp Normal file
View File

@ -0,0 +1,97 @@
#include <Common/StopToken.h>
#include <base/defines.h>
#include <exception>
#include <mutex>
#include <thread>
struct StopState
{
/// A pretty inefficient implementation (mutex instead of spinlock, std::list instead of intrusive list,
/// shared_ptr instead of custom refcounting), but this currently doesn't matter. If you want to use this in some
/// performance-sensitive code, feel free to reimplement, probably similar to folly::CancellationToken implementation
/// (but if it's actually performance-sensitive then maybe try to avoid using this at all: all this pointer chasing,
/// reference conting, and callbacks can't be very fast.)
std::mutex mutex;
std::atomic<bool> stopped {false};
std::list<StopCallback *> callbacks;
};
bool StopToken::stop_requested() const
{
return state && state->stopped.load();
}
StopSource::StopSource() : state(std::make_shared<StopState>()) {}
bool StopSource::request_stop()
{
std::list<StopCallback *> callbacks;
{
std::lock_guard lock(state->mutex);
if (state->stopped.exchange(true))
{
chassert(state->callbacks.empty());
return false;
}
callbacks = std::move(state->callbacks);
}
std::exception_ptr exception;
for (StopCallback * cb : callbacks)
{
/// If one StopCallback's destroys another StopCallback, this may deadlock because the second
/// StopCallback's destructor will wait for both callbacks to return (if it's later in the `callbacks` list).
/// This can be prevented by allowing ~StopCallback() to set some cancellation flag that we'd check here,
/// but this doesn't seem worth the trouble. Just don't have such complicated callbacks.
try
{
cb->callback();
}
catch (...)
{
if (!exception)
exception = std::current_exception();
}
cb->returned.store(true);
}
if (exception)
std::rethrow_exception(exception);
return true;
}
StopCallback::StopCallback(const StopToken & token, Callback cb) : state(token.state), callback(std::move(cb))
{
if (state == nullptr)
return;
std::unique_lock lock(state->mutex);
if (state->stopped.load())
{
lock.unlock();
state = nullptr;
callback();
}
else
{
state->callbacks.push_back(this);
it = std::prev(state->callbacks.end());
}
}
StopCallback::~StopCallback()
{
if (state == nullptr)
return;
std::unique_lock lock(state->mutex);
if (state->stopped.load())
{
lock.unlock();
while (!returned.load())
std::this_thread::yield();
}
else
{
state->callbacks.erase(it);
}
}

71
src/Common/StopToken.h Normal file
View File

@ -0,0 +1,71 @@
#pragma once
#include <memory>
#include <functional>
#include <list>
#include <atomic>
/// Just like std::stop_token, which isn't available yet. A.k.a. folly::CancellationToken.
/// When we switch to C++20, delete this and use std::stop_token instead.
struct StopState;
using StopStatePtr = std::shared_ptr<StopState>;
class StopToken
{
public:
StopToken() = default;
StopToken(const StopToken &) = default;
StopToken(StopToken &&) = default;
StopToken & operator=(const StopToken &) = default;
StopToken & operator=(StopToken &&) = default;
bool stop_requested() const;
bool stop_possible() const { return state != nullptr; }
private:
friend class StopSource;
friend class StopCallback;
StopStatePtr state;
explicit StopToken(StopStatePtr s) : state(std::move(s)) {}
};
class StopSource
{
public:
StopSource();
StopSource(const StopSource &) = default;
StopSource(StopSource &&) = default;
StopSource & operator=(const StopSource &) = default;
StopSource & operator=(StopSource &&) = default;
StopToken get_token() const { return StopToken(state); }
bool request_stop();
private:
StopStatePtr state;
};
class StopCallback
{
public:
using Callback = std::function<void()>;
StopCallback(const StopToken & token, Callback cb);
/// If the callback is already running, waits for it to return.
~StopCallback();
StopCallback(const StopCallback &) = delete;
StopCallback & operator=(const StopCallback &) = delete;
private:
friend class StopSource;
StopStatePtr state;
std::list<StopCallback *>::iterator it;
Callback callback;
std::atomic_bool returned {false};
};

View File

@ -47,6 +47,47 @@ namespace ProfileEvents
}
namespace
{
struct ScopedDecrement
{
std::optional<std::reference_wrapper<std::atomic<int64_t>>> atomic_var;
// Deleted copy constructor and copy assignment operator
ScopedDecrement(const ScopedDecrement&) = delete;
ScopedDecrement& operator=(const ScopedDecrement&) = delete;
// Move constructor
ScopedDecrement(ScopedDecrement&& other) noexcept
: atomic_var(std::move(other.atomic_var))
{
other.atomic_var.reset();
}
// Move assignment operator
ScopedDecrement& operator=(ScopedDecrement&& other) noexcept
{
if (this != &other)
{
atomic_var.swap(other.atomic_var);
}
return *this;
}
explicit ScopedDecrement(std::atomic<int64_t>& var)
: atomic_var(var)
{
atomic_var->get().fetch_sub(1, std::memory_order_relaxed);
}
~ScopedDecrement()
{
if (atomic_var)
atomic_var->get().fetch_add(1, std::memory_order_relaxed);
}
};
}
class JobWithPriority
{
public:
@ -55,6 +96,8 @@ public:
Job job;
Priority priority;
CurrentMetrics::Increment metric_increment;
ScopedDecrement available_threads_decrement;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;
/// Call stacks of all jobs' schedulings leading to this one
@ -62,11 +105,20 @@ public:
bool enable_job_stack_trace = false;
Stopwatch job_create_time;
// Deleted copy constructor and copy assignment operator
JobWithPriority(const JobWithPriority&) = delete;
JobWithPriority& operator=(const JobWithPriority&) = delete;
// Move constructor and move assignment operator
JobWithPriority(JobWithPriority&&) noexcept = default;
JobWithPriority& operator=(JobWithPriority&&) noexcept = default;
JobWithPriority(
Job job_, Priority priority_, CurrentMetrics::Metric metric,
const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_,
bool capture_frame_pointers)
bool capture_frame_pointers, ScopedDecrement available_threads_decrement_)
: job(job_), priority(priority_), metric_increment(metric),
available_threads_decrement(std::move(available_threads_decrement_)),
thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers)
{
if (!capture_frame_pointers)
@ -85,8 +137,6 @@ public:
{
return job_create_time.elapsedMicroseconds();
}
};
static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
@ -125,12 +175,19 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
max_threads = std::min(max_threads, static_cast<size_t>(MAX_THEORETICAL_THREAD_COUNT));
max_free_threads = std::min(max_free_threads, static_cast<size_t>(MAX_THEORETICAL_THREAD_COUNT));
remaining_pool_capacity.store(max_threads, std::memory_order_relaxed);
available_threads.store(0, std::memory_order_relaxed);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
value = std::min(value, static_cast<size_t>(MAX_THEORETICAL_THREAD_COUNT));
std::lock_guard lock(mutex);
remaining_pool_capacity.fetch_add(value - max_threads, std::memory_order_relaxed);
bool need_start_threads = (value > max_threads);
bool need_finish_free_threads = (value < max_free_threads);
@ -163,6 +220,7 @@ size_t ThreadPoolImpl<Thread>::getMaxThreads() const
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
value = std::min(value, static_cast<size_t>(MAX_THEORETICAL_THREAD_COUNT));
std::lock_guard lock(mutex);
bool need_finish_free_threads = (value < max_free_threads);
@ -184,7 +242,6 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
jobs.reserve(queue_size);
}
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context)
@ -207,6 +264,38 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
return false;
};
// Decrement available_threads, scoped to the job lifecycle.
// This ensures that available_threads decreases when a new job starts
// and automatically increments when the job completes or goes out of scope.
ScopedDecrement available_threads_decrement(available_threads);
std::unique_ptr<ThreadFromThreadPool> new_thread;
// Load the current capacity
int64_t capacity = remaining_pool_capacity.load(std::memory_order_relaxed);
int64_t currently_available_threads = available_threads.load(std::memory_order_relaxed);
while (currently_available_threads <= 0 && capacity > 0)
{
if (remaining_pool_capacity.compare_exchange_weak(capacity, capacity - 1, std::memory_order_relaxed))
{
try
{
new_thread = std::make_unique<ThreadFromThreadPool>(*this);
break; // Exit the loop once a thread is successfully created.
}
catch (...)
{
// Failed to create the thread, restore capacity
remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed);
std::lock_guard lock(mutex); // needed to change first_exception.
return on_error("failed to start the thread");
}
}
// capacity gets reloaded by (unsuccessful) compare_exchange_weak
currently_available_threads = available_threads.load(std::memory_order_relaxed);
}
{
Stopwatch watch;
std::unique_lock lock(mutex);
@ -219,6 +308,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
/// Wait for available threads or timeout
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
@ -230,48 +320,90 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
if (shutdown)
return on_error("shutdown");
/// We must not to allocate any memory after we emplaced a job in a queue.
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.
/// We must not allocate memory or perform operations that could throw exceptions after adding a job to the queue,
/// because if an exception occurs, it may leave the job in the queue without notifying any threads.
typename ThreadFromThreadPool::ThreadList::iterator thread_slot;
/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
/// The decision to start a new thread is made outside the locked section.
/// However, thread load and demand can change dynamically, and decisions based on
/// atomic variables outside the critical section might become outdated by the time we acquire the lock.
/// This can lead to two possible scenarios:
///
/// 1) Relatively common: A new thread was started outside the lock, but by the time we acquire the lock,
/// demand for threads has decreased (e.g., other threads have finished their jobs and are now idle).
/// In this case, even though there are now enough threads, we still attempt to add the new thread
/// to the pool, provided it does not exceed the `max_threads` or `max_free_threads` limits. Keeping
/// an extra thread in the pool may help accommodate a sudden increase in demand without the need
/// to wait for thread creation.
///
/// 2) Very unlikely (but possible): Outside the lock, it appeared there were enough threads
/// to handle the workload. However, after acquiring the lock, it turns out the new thread
/// is needed (possibly because one of the existing threads was removed or became unavailable).
/// In this case, we create the thread inside the critical section, even though this may introduce
/// a small delay.
/// Check if we can add the thread created outside the critical section to the pool.
bool adding_new_thread = new_thread && threads.size() < std::min(max_threads, 1 /* current job */ + scheduled_jobs + max_free_threads);
// If we didn't create a new thread initially but realize we actually need one (unlikely scenario).
if (unlikely(!adding_new_thread && threads.size() < std::min(max_threads, scheduled_jobs + 1)))
{
try
{
threads.emplace_front();
remaining_pool_capacity.fetch_sub(1, std::memory_order_relaxed);
new_thread = std::make_unique<ThreadFromThreadPool>(*this);
}
catch (...)
{
// If thread creation fails, restore the pool capacity and return an error.
remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed);
return on_error("failed to start the thread");
}
adding_new_thread = true;
}
if (adding_new_thread)
{
try
{
threads.emplace_front(std::move(new_thread));
thread_slot = threads.begin();
}
catch (...)
{
/// Most likely this is a std::bad_alloc exception
return on_error("cannot allocate thread slot");
}
try
{
Stopwatch watch2;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
threads.pop_front();
return on_error("cannot allocate thread");
return on_error("cannot emplace the thread in the pool");
}
}
else // we have a thread but there is no space for that in the pool.
{
new_thread.reset();
}
jobs.emplace(std::move(job),
priority,
metric_scheduled_jobs,
/// Tracing context on this thread is used as parent context for the sub-thread that runs the job
propagate_opentelemetry_tracing_context ? DB::OpenTelemetry::CurrentContext() : DB::OpenTelemetry::TracingContextOnThread(),
/// capture_frame_pointers
DB::Exception::enable_job_stack_trace);
try
{
jobs.emplace(std::move(job),
priority,
metric_scheduled_jobs,
/// Tracing context on this thread is used as parent context for the sub-thread that runs the job
propagate_opentelemetry_tracing_context ? DB::OpenTelemetry::CurrentContext() : DB::OpenTelemetry::TracingContextOnThread(),
/// capture_frame_pointers
DB::Exception::enable_job_stack_trace,
std::move(available_threads_decrement));
++scheduled_jobs;
++scheduled_jobs;
if (adding_new_thread)
(*thread_slot)->start(thread_slot);
}
catch (...)
{
if (adding_new_thread)
threads.pop_front();
return on_error("cannot start the job or thread");
}
}
/// Wake up a free thread to run the new job.
@ -291,30 +423,51 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
while (threads.size() < std::min(scheduled_jobs, max_threads))
{
std::unique_ptr<ThreadFromThreadPool> new_thread;
int64_t capacity = remaining_pool_capacity.load(std::memory_order_relaxed);
while (capacity > 0)
{
if (remaining_pool_capacity.compare_exchange_weak(capacity, capacity - 1, std::memory_order_relaxed))
{
try
{
// Successfully decremented, attempt to create a new thread
new_thread = std::make_unique<ThreadFromThreadPool>(*this);
}
catch (...)
{
// Failed to create the thread, restore capacity
remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed);
}
break; // Exit loop whether thread creation succeeded or not
}
}
if (!new_thread)
break; /// failed to start more threads
typename ThreadFromThreadPool::ThreadList::iterator thread_slot;
try
{
threads.emplace_front();
threads.emplace_front(std::move(new_thread));
thread_slot = threads.begin();
}
catch (...)
{
break; /// failed to start more threads
break;
}
try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
(*thread_slot)->start(thread_slot);
}
catch (...)
{
threads.pop_front();
break; /// failed to start more threads
break;
}
}
}
@ -376,21 +529,29 @@ void ThreadPoolImpl<Thread>::finalize()
{
std::lock_guard lock(mutex);
shutdown = true;
/// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function.
/// scheduleImpl doesn't check for shutdown outside the critical section,
/// so we set remaining_pool_capacity to a large negative value
/// (e.g., -MAX_THEORETICAL_THREAD_COUNT) to signal that no new threads are needed.
/// This effectively prevents any new threads from being started during shutdown.
remaining_pool_capacity.store(-MAX_THEORETICAL_THREAD_COUNT, std::memory_order_relaxed);
/// Disable thread self-removal from `threads`. Otherwise, if threads remove themselves,
/// the thread.join() operation will fail later in this function.
threads_remove_themselves = false;
}
/// Wake up threads so they can finish themselves.
/// Notify all threads to wake them up, so they can complete their work and exit gracefully.
new_job_or_shutdown.notify_all();
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
/// Join all threads before clearing the list
for (auto& thread_ptr : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
if (thread_ptr)
thread_ptr->join();
}
// now it's safe to clear the threads
threads.clear();
}
@ -426,11 +587,88 @@ bool ThreadPoolImpl<Thread>::finished() const
return shutdown;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
ThreadPoolImpl<Thread>::ThreadFromThreadPool::ThreadFromThreadPool(ThreadPoolImpl& parent_pool_)
: parent_pool(parent_pool_)
, thread_state(ThreadState::Preparing) // Initial state is Preparing
{
Stopwatch watch2;
thread = Thread(&ThreadFromThreadPool::worker, this);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
parent_pool.available_threads.fetch_add(1, std::memory_order_relaxed);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::ThreadFromThreadPool::start(typename ThreadList::iterator & it)
{
/// the thread which created ThreadFromThreadPool should start it after adding it to the pool, or destroy it.
/// no parallelism is expected here. So the only valid transition for the start method is Preparing to Running.
chassert(thread_state.load(std::memory_order_relaxed) == ThreadState::Preparing);
thread_it = it;
thread_state.store(ThreadState::Running, std::memory_order_relaxed); /// now worker can start executing the main loop
}
template <typename Thread>
void ThreadPoolImpl<Thread>::ThreadFromThreadPool::join()
{
// Ensure the thread is joined before destruction if still joinable
if (thread.joinable())
thread.join();
}
template <typename Thread>
void ThreadPoolImpl<Thread>::ThreadFromThreadPool::removeSelfFromPoolNoPoolLock()
{
if (thread.joinable())
thread.detach();
parent_pool.threads.erase(thread_it);
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadFromThreadPool::~ThreadFromThreadPool()
{
parent_pool.available_threads.fetch_sub(1, std::memory_order_relaxed);
// The thread is being destructed, so the remaining pool capacity increases
parent_pool.remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed);
// If the worker was still waiting in the loop for thread initialization,
// signal it to terminate and be destroyed now.
thread_state.store(ThreadState::Destructing, std::memory_order_relaxed);
join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::ThreadFromThreadPool::worker()
{
DENY_ALLOCATIONS_IN_SCOPE;
CurrentMetrics::Increment metric_pool_threads(metric_threads);
// wait until the thread will be started
while (thread_state.load(std::memory_order_relaxed) == ThreadState::Preparing)
{
std::this_thread::yield(); // let's try to yield to avoid consuming too much CPU in the busy-loop
}
// If the thread transitions to Destructing, exit
if (thread_state.load(std::memory_order_relaxed) == ThreadState::Destructing)
return;
CurrentMetrics::Increment metric_pool_threads(parent_pool.metric_threads);
bool job_is_done = false;
std::exception_ptr exception_from_job;
@ -447,7 +685,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
Stopwatch watch;
std::unique_lock lock(mutex);
std::unique_lock lock(parent_pool.mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
@ -458,48 +696,55 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
job_is_done = false;
if (exception_from_job)
{
if (!first_exception)
first_exception = exception_from_job;
if (shutdown_on_exception)
shutdown = true;
if (!parent_pool.first_exception)
parent_pool.first_exception = exception_from_job;
if (parent_pool.shutdown_on_exception)
{
parent_pool.shutdown = true;
// Prevent new thread creation, as explained in finalize.
parent_pool.remaining_pool_capacity.store(-MAX_THEORETICAL_THREAD_COUNT, std::memory_order_relaxed);
}
exception_from_job = {};
}
--scheduled_jobs;
--parent_pool.scheduled_jobs;
job_finished.notify_all();
if (shutdown)
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
parent_pool.job_finished.notify_all();
if (parent_pool.shutdown)
parent_pool.new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
}
new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); });
parent_pool.new_job_or_shutdown.wait(lock, [this] {
return !parent_pool.jobs.empty()
|| parent_pool.shutdown
|| parent_pool.threads.size() > std::min(parent_pool.max_threads, parent_pool.scheduled_jobs + parent_pool.max_free_threads);
});
if (jobs.empty() || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads))
if (parent_pool.jobs.empty() || parent_pool.threads.size() > std::min(parent_pool.max_threads, parent_pool.scheduled_jobs + parent_pool.max_free_threads))
{
// We enter here if:
// - either this thread is not needed anymore due to max_free_threads excess;
// - or shutdown happened AND all jobs are already handled.
if (threads_remove_themselves)
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
if (parent_pool.threads_remove_themselves)
removeSelfFromPoolNoPoolLock(); // Detach and remove itself from the pool
return;
}
/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority.
job_data = std::move(const_cast<JobWithPriority &>(jobs.top()));
jobs.pop();
job_data = std::move(const_cast<JobWithPriority &>(parent_pool.jobs.top()));
parent_pool.jobs.pop();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds,
job_data->elapsedMicroseconds());
/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
if (parent_pool.shutdown)
{
{
ALLOW_ALLOCATIONS_IN_SCOPE;
@ -522,7 +767,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
if (DB::Exception::enable_job_stack_trace)
DB::Exception::setThreadFramePointers(std::move(job_data->frame_pointers));
CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);
CurrentMetrics::Increment metric_active_pool_threads(parent_pool.metric_active_threads);
if constexpr (!std::is_same_v<Thread, std::thread>)
{
@ -575,7 +820,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
}
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, false>>;

View File

@ -32,7 +32,7 @@ class JobWithPriority;
*
* This thread pool can be used as a task queue.
* For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks
* - in this case you will be blocked to keep 10 tasks in fly.
* - in this case you will be blocked to keep 10 tasks in flight.
*
* Thread: std::thread or something with identical interface.
*/
@ -40,9 +40,57 @@ template <typename Thread>
class ThreadPoolImpl
{
public:
// used as 'unlimited' thread pool size
// on linux you can not have more threads even if the RAM is unlimited
// see https://docs.kernel.org/admin-guide/sysctl/kernel.html#threads-max
static constexpr int MAX_THEORETICAL_THREAD_COUNT = 0x3fffffff; // ~1 billion
using Job = std::function<void()>;
using Metric = CurrentMetrics::Metric;
// Subclass that encapsulates the thread and has the ability to remove itself from the pool.
class ThreadFromThreadPool
{
public:
using ThreadList = std::list<std::unique_ptr<ThreadFromThreadPool>>;
/// Constructor to initialize and start the thread (but not associate it with the pool)
explicit ThreadFromThreadPool(ThreadPoolImpl& parent_pool);
// Shift the thread state from Preparing to Running to allow the worker to start.
void start(ThreadList::iterator& it);
void join();
// Destructor to join the thread if needed (shift the state to Destructing if it was not running)
~ThreadFromThreadPool();
private:
ThreadPoolImpl& parent_pool;
Thread thread;
enum class ThreadState
{
Preparing,
Running,
Destructing
};
// Atomic state to track the thread's state
std::atomic<ThreadState> thread_state;
// Stores the position of the thread in the parent thread pool list
typename std::list<std::unique_ptr<ThreadFromThreadPool>>::iterator thread_it;
// Remove itself from the parent pool
void removeSelfFromPoolNoPoolLock();
// Worker does a busy loop (with yield) while the state is Preparing.
// After that, immediately returns if the state changed to Destructing,
// or starts the main working loop if the state is Running.
void worker();
};
/// Maximum number of threads is based on the number of physical cores.
ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_, Metric metric_scheduled_jobs_);
@ -63,14 +111,14 @@ public:
size_t queue_size_,
bool shutdown_on_exception_ = true);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Add new job. Locks until the number of scheduled jobs is less than the maximum or an exception in one of the threads was thrown.
/// If any thread has thrown an exception, the first exception will be rethrown from this method,
/// and the exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: lower is higher.
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
/// NOTE: Probably you should call wait() if an exception was thrown. If some previously scheduled jobs are using some objects,
/// located on the stack of the current thread, the stack must not be unwound until all jobs are finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in its own destructor.
void scheduleOrThrowOnError(Job job, Priority priority = {});
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
@ -81,12 +129,12 @@ public:
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// If any thread has thrown an exception, the first exception will be rethrown from this method,
/// and the exception will be cleared.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
/// You should not destroy the object while calling schedule or wait methods from other threads.
~ThreadPoolImpl();
/// Returns number of running and scheduled jobs.
@ -127,28 +175,40 @@ private:
size_t queue_size;
size_t scheduled_jobs = 0;
// Originally equals to max_threads, but changes dynamically.
// Decrements with every new thread started, increments when it finishes.
// If positive, then more threads can be started.
// When it comes to zero, it means that max_threads threads have already been started.
// it can be below zero when the threadpool is shutting down
std::atomic<int64_t> remaining_pool_capacity;
// Increments every time a new thread joins the thread pool or a job finishes.
// Decrements every time a task is scheduled.
// If positive, it means that there are more threads than jobs (and some are idle).
// If zero, it means that every thread has a job.
// If negative, it means that we have more jobs than threads.
std::atomic<int64_t> available_threads;
bool shutdown = false;
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;
boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;
std::list<Thread> threads;
ThreadFromThreadPool::ThreadList threads;
std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;
template <typename ReturnType>
ReturnType scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context = true);
void worker(typename std::list<Thread>::iterator thread_it);
/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked.
/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with the mutex locked.
void startNewThreadsNoLock();
void finalize();
void onDestroy();
};
/// ThreadPool with std::thread for threads.
using FreeThreadPool = ThreadPoolImpl<std::thread>;

View File

@ -82,3 +82,16 @@ std::string formatReadableQuantity(double value, int precision)
formatReadableQuantity(value, out, precision);
return out.str();
}
void formatReadableTime(double ns, DB::WriteBuffer & out, int precision)
{
const char * units[] = {" ns", " us", " ms", " s"};
formatReadable(ns, out, precision, units, sizeof(units) / sizeof(units[0]), 1000);
}
std::string formatReadableTime(double ns, int precision)
{
DB::WriteBufferFromOwnString out;
formatReadableTime(ns, out, precision);
return out.str();
}

View File

@ -23,6 +23,9 @@ std::string formatReadableSizeWithDecimalSuffix(double value, int precision = 2)
void formatReadableQuantity(double value, DB::WriteBuffer & out, int precision = 2);
std::string formatReadableQuantity(double value, int precision = 2);
/// Prints the passed time in nanoseconds as 123.45 ms.
void formatReadableTime(double ns, DB::WriteBuffer & out, int precision = 2);
std::string formatReadableTime(double ns, int precision = 2);
/// Wrapper around value. If used with fmt library (e.g. for log messages),
/// value is automatically formatted as size with binary suffix.

View File

@ -176,7 +176,7 @@ unsigned getNumberOfCPUCoresToUseImpl()
///
/// On really big machines, SMT is detrimental to performance (+ ~5% overhead in ClickBench). On such machines, we limit ourself to the physical cores.
/// Few cores indicate it is a small machine, runs in a VM or is a limited cloud instance --> it is reasonable to use all the cores.
if (cores >= 32)
if (cores >= 64)
cores = physical_concurrency();
#endif

View File

@ -5,6 +5,7 @@
#include <Common/MemorySanitizer.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Parsers/IAST.h>
#include <base/scope_guard.h>
#include <base/getPageSize.h>
#include <cstdio>

View File

@ -5,6 +5,7 @@
#include <IO/VarInt.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionCodecEncrypted.h>
#include <Parsers/IAST.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>

View File

@ -1,7 +1,7 @@
#include <Compression/CompressionCodecNone.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <Parsers/IAST.h>
namespace DB
{

View File

@ -4,7 +4,7 @@
#include <boost/noncopyable.hpp>
#include <Compression/CompressionInfo.h>
#include <base/types.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Common/SipHash.h>

View File

@ -1,6 +1,5 @@
#pragma once
#include <Core/Settings.h>
#include <Core/Types_fwd.h>
#include <boost/program_options.hpp>

View File

@ -0,0 +1,156 @@
#pragma once
#include <base/types.h>
#include <Core/Defines.h>
namespace DistributedCache
{
static constexpr auto SERVER_CONFIG_PREFIX = "distributed_cache_server";
static constexpr auto CLIENT_CONFIG_PREFIX = "distributed_cache_client";
static constexpr auto REGISTERED_SERVERS_PATH = "registry";
static constexpr auto OFFSET_ALIGNMENT_PATH = "offset_alignment";
static constexpr auto DEFAULT_ZOOKEEPER_PATH = "/distributed_cache/";
static constexpr auto MAX_VIRTUAL_NODES = 100;
static constexpr auto DEFAULT_OFFSET_ALIGNMENT = 16 * 1024 * 1024;
static constexpr auto DEFAULT_MAX_PACKET_SIZE = DB::DBMS_DEFAULT_BUFFER_SIZE;
static constexpr auto MAX_UNACKED_INFLIGHT_PACKETS = 10;
static constexpr auto ACK_DATA_PACKET_WINDOW = 5;
static constexpr auto DEFAULT_CONNECTION_POOL_SIZE = 15000;
static constexpr auto DEFAULT_CONNECTION_TTL_SEC = 200;
static constexpr auto INITIAL_PROTOCOL_VERSION = 0;
static constexpr auto PROTOCOL_VERSION_WITH_QUERY_ID = 1;
static constexpr auto PROTOCOL_VERSION_WITH_MAX_INFLIGHT_PACKETS = 2;
static constexpr auto PROTOCOL_VERSION_WITH_GCS_TOKEN = 3;
static constexpr UInt32 PROTOCOL_VERSION_WITH_AZURE_AUTH = 4;
static constexpr UInt32 PROTOCOL_VERSION_WITH_TEMPORATY_DATA = 5;
static constexpr UInt32 CURRENT_PROTOCOL_VERSION = PROTOCOL_VERSION_WITH_TEMPORATY_DATA;
namespace Protocol
{
static constexpr auto MIN_VERSION_WITH_QUERY_ID_IN_REQUEST = 1;
/**
* Distributed cache protocol.
*
* Read request:
* Step1: (Client) calculate aligned_offset = aligned(file_offset) - alignment to file_offset.
* The alignment is equal to `offset_alignment`
* (stored on zookeeper for shared access from server and client),
* which allows to guarantee if the client needs offset x,
* then it will go to the server which contains a covering
* file segment for this offset.
* Step2: (Client) calculate hash(x, remote_path, aligned_file_offset) -> h,
* Step3: (Client) find distributed cache server: hash_ring(h) -> s
* Step4: (Client) connect to s:
* Client: `Hello` packet (protocol_version, request_type)
* Server: `Hello` packet (mutual_protocol_version)
* Step5: send general info:
* Client: `ReadInfo` packet (object storage connection info, remote paths, start offset, end offset)
* Step6:
* Server: `ReadRange` packet (includes read range), and send the data.
* Client: `Ok` packet
* in case of error (Client): `EndRequest` packet.
* Step7:
* Client: do Step1 from current file offset and get aligned_offset'.
* If aligned_offset' == aligned_offset, do Step6 again.
* else: go to Step2
*
* Write request:
* Step1: (Client) calculate hash(x, remote_path, file_offset) -> h,
* Step2: (Client) find distributed cache server: hash_ring(h) -> s
* Step3: (Client) connect to s:
* Client: `Hello` packet (protocol_version, request_type)
* Server: `Hello` packet (mutual_protocol_version)
* Step4: send general info:
* Client: `WriteInfo` packet (object storage connection info, remote_path, write range)
* Step5: write one file_segment's range
* Client: `WriteRange` packet (file_segment_start_offset), then process the write.
* Server: `Ok` (after each `Data` packet)
* or `Stop` packet (on error).
* Step6:
* if eof: do Step8
* else: do Step7
* Step7:
* do step1: h' = hash(x, remote_path, file_offset'), where file_offset' - start of the next file segment
* do step2: s' = hash_ring(h')
* if s' == s: do Step5
* else: do Step8 and go to Step3
* Step8:
* Client: `EndRequest` packet
* Server: `Ok` packet
*/
enum RequestType
{
Min = 0,
Read = 1, /// read-through cache
Write = 2, /// write-through cache
Remove = 3, /// drop cache
Show = 4, /// get current cache state
CurrentMetrics = 5, /// get CurrentMetrics
ProfileEvents = 6, /// get ProfileEvents
Max = 8,
};
namespace Client
{
enum Enum
{
Min = 0,
/// A hello packet for handshake between client and server.
Hello = 1,
/// A packet to start a new request: Read, Write, Remove, Show, etc
StartRequest = 2,
/// A packet to identify that the request is finished.
/// E.g. for read request we no longer need receiving data (even if requested read range is not finished);
/// for write request no data will no longer be sent.
EndRequest = 3,
/// A request to continue already started request but with a new information.
/// E.g. for read request - a new read range is needed;
/// for write request - a new write range will be sent.
ContinueRequest = 4,
/// Acknowledgement of `data_packet_ack_window` processed `DataPacket` packets.
AckRequest = 5,
Max = 6,
};
}
namespace Server
{
enum Enum
{
Min = 0,
/// A hello packet for handshake between client and server.
Hello = 1,
/// Identifies that a request was successfully executed.
Ok = 2,
/// Identifies a packet containing an exception message happened on server's size.
Error = 3,
/// Identifies a packet for a Read request.
ReadResult = 4,
/// Identifies a packet for incremental ProfileEvents during Read or Write request.
ProfileCounters = 5,
/// Identifies a packet for a Show request.
ShowResult = 6,
/// Identifies a packet for a ProfileEvents request.
ProfileEvents = 7,
/// Identifies a packet for a Metrics request.
Metrics = 8,
/// Identifies that this server cannot receive any more data for Write request
/// (cache is full or errors during insertion).
Stop = 9,
Max = 11
};
}
}
}

View File

@ -10,7 +10,6 @@
#include <Core/CompareHelper.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <base/DayNum.h>
#include <base/IPv4andIPv6.h>
#include <Common/AllocatorWithMemoryTracking.h>

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -54,6 +54,8 @@ class WriteBuffer;
M(CLASS_NAME, DefaultDatabaseEngine) \
M(CLASS_NAME, DefaultTableEngine) \
M(CLASS_NAME, Dialect) \
M(CLASS_NAME, DistributedCacheLogMode) /* Cloud only */ \
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
M(CLASS_NAME, DistributedDDLOutputMode) \
M(CLASS_NAME, DistributedProductMode) \
M(CLASS_NAME, Double) \

View File

@ -1,9 +1,10 @@
#include <Core/SettingsChangesHistory.h>
#include <Core/Defines.h>
#include <Core/SettingsChangesHistory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -79,6 +80,22 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"enable_secure_identifiers", false, false, "New setting."},
{"min_free_disk_bytes_to_perform_insert", 0, 0, "New setting."},
{"min_free_disk_ratio_to_perform_insert", 0.0, 0.0, "New setting."},
{"cloud_mode_database_engine", 1, 1, "A setting for ClickHouse Cloud"},
{"allow_experimental_shared_set_join", 1, 1, "A setting for ClickHouse Cloud"},
{"read_through_distributed_cache", 0, 0, "A setting for ClickHouse Cloud"},
{"write_through_distributed_cache", 0, 0, "A setting for ClickHouse Cloud"},
{"distributed_cache_throw_on_error", 0, 0, "A setting for ClickHouse Cloud"},
{"distributed_cache_log_mode", "on_error", "on_error", "A setting for ClickHouse Cloud"},
{"distributed_cache_fetch_metrics_only_from_current_az", 1, 1, "A setting for ClickHouse Cloud"},
{"distributed_cache_connect_max_tries", 100, 100, "A setting for ClickHouse Cloud"},
{"distributed_cache_receive_response_wait_milliseconds", 60000, 60000, "A setting for ClickHouse Cloud"},
{"distributed_cache_receive_timeout_milliseconds", 10000, 10000, "A setting for ClickHouse Cloud"},
{"distributed_cache_wait_connection_from_pool_milliseconds", 100, 100, "A setting for ClickHouse Cloud"},
{"distributed_cache_bypass_connection_pool", 0, 0, "A setting for ClickHouse Cloud"},
{"distributed_cache_pool_behaviour_on_limit", "allocate_bypassing_pool", "allocate_bypassing_pool", "A setting for ClickHouse Cloud"},
{"distributed_cache_read_alignment", 0, 0, "A setting for ClickHouse Cloud"},
{"distributed_cache_max_unacked_inflight_packets", 10, 10, "A setting for ClickHouse Cloud"},
{"distributed_cache_data_packet_ack_window", 5, 5, "A setting for ClickHouse Cloud"},
}
},
{"24.9",
@ -543,15 +560,40 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
};
const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & getSettingsChangesHistory()
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> merge_tree_settings_changes_history_initializer =
{
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history;
{"24.12",
{
}
},
{"24.11",
{
}
},
{"24.10",
{
}
},
{"24.9",
{
}
},
{"24.8",
{
{"deduplicate_merge_projection_mode", "ignore", "throw", "Do not allow to create inconsistent projection"}
}
},
};
static std::once_flag initialized_flag;
std::call_once(initialized_flag, []()
static void initSettingsChangesHistory(
std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & settings_changes_history,
std::once_flag & initialized_flag,
std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> & initializer
)
{
std::call_once(initialized_flag, [&]()
{
for (const auto & setting_change : settings_changes_history_initializer)
for (const auto & setting_change : initializer)
{
/// Disallow duplicate keys in the settings changes history. Example:
/// {"21.2", {{"some_setting_1", false, true, "[...]"}}},
@ -564,7 +606,24 @@ const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & get
settings_changes_history[setting_change.first] = setting_change.second;
}
});
}
const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & getSettingsChangesHistory()
{
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history;
static std::once_flag initialized_flag;
initSettingsChangesHistory(settings_changes_history, initialized_flag, settings_changes_history_initializer);
return settings_changes_history;
}
const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & getMergeTreeSettingsChangesHistory()
{
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> merge_tree_settings_changes_history;
static std::once_flag initialized_flag;
initSettingsChangesHistory(merge_tree_settings_changes_history, initialized_flag, merge_tree_settings_changes_history_initializer);
return merge_tree_settings_changes_history;
}
}

View File

@ -39,5 +39,6 @@ namespace SettingsChangesHistory
}
const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & getSettingsChangesHistory();
const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & getMergeTreeSettingsChangesHistory();
}

View File

@ -68,6 +68,14 @@ IMPLEMENT_SETTING_ENUM(OverflowMode, ErrorCodes::UNKNOWN_OVERFLOW_MODE,
{{"throw", OverflowMode::THROW},
{"break", OverflowMode::BREAK}})
IMPLEMENT_SETTING_ENUM(DistributedCacheLogMode, ErrorCodes::BAD_ARGUMENTS,
{{"nothing", DistributedCacheLogMode::LOG_NOTHING},
{"on_error", DistributedCacheLogMode::LOG_ON_ERROR},
{"all", DistributedCacheLogMode::LOG_ALL}})
IMPLEMENT_SETTING_ENUM(DistributedCachePoolBehaviourOnLimit, ErrorCodes::BAD_ARGUMENTS,
{{"wait", DistributedCachePoolBehaviourOnLimit::WAIT},
{"allocate_bypassing_pool", DistributedCachePoolBehaviourOnLimit::ALLOCATE_NEW_BYPASSING_POOL}});
IMPLEMENT_SETTING_ENUM(OverflowModeGroupBy, ErrorCodes::UNKNOWN_OVERFLOW_MODE,
{{"throw", OverflowMode::THROW},
@ -178,7 +186,8 @@ IMPLEMENT_SETTING_ENUM(LightweightMutationProjectionMode, ErrorCodes::BAD_ARGUME
{"rebuild", LightweightMutationProjectionMode::REBUILD}})
IMPLEMENT_SETTING_ENUM(DeduplicateMergeProjectionMode, ErrorCodes::BAD_ARGUMENTS,
{{"throw", DeduplicateMergeProjectionMode::THROW},
{{"ignore", DeduplicateMergeProjectionMode::IGNORE},
{"throw", DeduplicateMergeProjectionMode::THROW},
{"drop", DeduplicateMergeProjectionMode::DROP},
{"rebuild", DeduplicateMergeProjectionMode::REBUILD}})

View File

@ -10,7 +10,7 @@
#include <Core/ParallelReplicasMode.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadSettings.h>
#include <Parsers/ASTSQLSecurity.h>
#include <Access/Common/SQLSecurityDefs.h>
#include <Parsers/IdentifierQuotingStyle.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/ShellCommandSettings.h>
@ -218,6 +218,9 @@ enum class DefaultTableEngine : uint8_t
DECLARE_SETTING_ENUM(DefaultTableEngine)
DECLARE_SETTING_ENUM(DistributedCacheLogMode)
DECLARE_SETTING_ENUM(DistributedCachePoolBehaviourOnLimit)
enum class CleanDeletedRows : uint8_t
{
@ -314,6 +317,7 @@ DECLARE_SETTING_ENUM(LightweightMutationProjectionMode)
enum class DeduplicateMergeProjectionMode : uint8_t
{
IGNORE,
THROW,
DROP,
REBUILD,

View File

@ -294,7 +294,7 @@ void SerializationObjectDeprecated<Parser>::serializeBinaryBulkWithMultipleStrea
}
settings.path.push_back(Substream::DeprecatedObjectData);
if (auto * stream = settings.getter(settings.path))
if (auto * /*stream*/ _ = settings.getter(settings.path))
{
state_object->nested_serialization->serializeBinaryBulkWithMultipleStreams(
*tuple_column, offset, limit, settings, state_object->nested_state);

View File

@ -2,8 +2,7 @@
#include <string>
#include <Core/MultiEnum.h>
#include <Parsers/IAST.h>
#include "IDataType.h"
#include <DataTypes/IDataType.h>
namespace DB
{

View File

@ -2,7 +2,7 @@
#include <mutex>
#include <Databases/IDatabase.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <base/types.h>

View File

@ -6,7 +6,7 @@
#include <mutex>
#include <Databases/IDatabase.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <base/types.h>

View File

@ -51,6 +51,11 @@ namespace Setting
extern const SettingsSetOperationMode union_default_mode;
}
namespace MergeTreeSetting
{
extern const MergeTreeSettingsString storage_policy;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -154,7 +159,7 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu
/// Get table's storage policy
MergeTreeSettings default_settings = getContext()->getMergeTreeSettings();
auto policy = getContext()->getStoragePolicy(default_settings.storage_policy);
auto policy = getContext()->getStoragePolicy(default_settings[MergeTreeSetting::storage_policy]);
if (auto * query_settings = create_query->storage->settings)
if (Field * policy_setting = query_settings->changes.tryGet("storage_policy"))
policy = getContext()->getStoragePolicy(policy_setting->safeGet<String>());

View File

@ -1612,7 +1612,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl
waitDatabaseStarted();
auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id.") || startsWith(table_name, ".tmp.inner_id."));
if (txn && txn->isInitialQuery() && !txn->isCreateOrReplaceQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);

View File

@ -1,5 +1,7 @@
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DDLTask.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/ServerUUID.h>
@ -21,6 +23,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int DATABASE_REPLICATION_FAILED;
extern const int NOT_A_LEADER;
extern const int QUERY_WAS_CANCELLED;
extern const int TABLE_IS_DROPPED;
extern const int UNFINISHED;
}
@ -229,7 +233,7 @@ bool DatabaseReplicatedDDLWorker::waitForReplicaToProcessAllEntries(UInt64 timeo
String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed)
DatabaseReplicated * const database, bool committed, Coordination::Requests additional_checks)
{
const String query_path_prefix = database->zookeeper_path + "/log/query-";
@ -244,15 +248,16 @@ String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookee
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential));
ops.insert(ops.end(), additional_checks.begin(), additional_checks.end());
Coordination::Responses res;
Coordination::Error code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZOK)
{
counter_path = dynamic_cast<const Coordination::CreateResponse &>(*res.back()).path_created;
counter_path = dynamic_cast<const Coordination::CreateResponse &>(*res[1]).path_created;
break;
}
else if (code != Coordination::Error::ZNODEEXISTS)
else if (res[0]->error != Coordination::Error::ZNODEEXISTS)
zkutil::KeeperMultiException::check(code, ops, res);
sleepForMilliseconds(50);
@ -305,7 +310,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
String entry_path = enqueueQuery(entry);
String entry_path = enqueueQueryImpl(zookeeper, entry, database, false, query_context->getDDLAdditionalChecksOnEnqueue());
auto try_node = zkutil::EphemeralNodeHolder::existing(entry_path + "/try", *zookeeper);
String entry_name = entry_path.substr(entry_path.rfind('/') + 1);
auto task = std::make_unique<DatabaseReplicatedTask>(entry_name, entry_path, database);
@ -317,12 +322,21 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
LOG_DEBUG(log, "Waiting for worker thread to process all entries before {}", entry_name);
UInt64 timeout = query_context->getSettingsRef()[Setting::database_replicated_initial_query_timeout_sec];
StopToken cancellation = query_context->getDDLQueryCancellation();
StopCallback cancellation_callback(cancellation, [&] { wait_current_task_change.notify_all(); });
{
std::unique_lock lock{mutex};
bool processed = wait_current_task_change.wait_for(lock, std::chrono::seconds(timeout), [&]()
{
assert(zookeeper->expired() || current_task <= entry_name);
return zookeeper->expired() || current_task == entry_name || stop_flag;
if (zookeeper->expired() || stop_flag)
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again");
if (cancellation.stop_requested())
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "DDL query was cancelled");
return current_task == entry_name;
});
if (!processed)
@ -330,8 +344,8 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
"most likely because replica is busy with previous queue entries");
}
if (zookeeper->expired() || stop_flag)
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again");
if (entry.parent_table_uuid.has_value() && !checkParentTableExists(entry.parent_table_uuid.value()))
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Parent table doesn't exist");
processTask(*task, zookeeper);
@ -350,8 +364,9 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
return entry_path;
}
DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool dry_run)
{
if (!dry_run)
{
std::lock_guard lock{mutex};
if (current_task < entry_name)
@ -377,7 +392,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
zkutil::EventPtr wait_committed_or_failed = std::make_shared<Poco::Event>();
String try_node_path = fs::path(entry_path) / "try";
if (zookeeper->tryGet(try_node_path, initiator_name, nullptr, wait_committed_or_failed))
if (!dry_run && zookeeper->tryGet(try_node_path, initiator_name, nullptr, wait_committed_or_failed))
{
task->is_initial_query = initiator_name == task->host_id_str;
@ -458,6 +473,12 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
return {};
}
if (task->entry.parent_table_uuid.has_value() && !checkParentTableExists(task->entry.parent_table_uuid.value()))
{
out_reason = fmt::format("Parent table {} doesn't exist", task->entry.parent_table_uuid.value());
return {};
}
return task;
}
@ -468,6 +489,12 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name,
return entry_number + logs_to_keep < max_log_ptr;
}
bool DatabaseReplicatedDDLWorker::checkParentTableExists(const UUID & uuid) const
{
auto [db, table] = DatabaseCatalog::instance().tryGetByUUID(uuid);
return db.get() == database && table != nullptr && !table->is_dropped.load() && !table->is_detached.load();
}
void DatabaseReplicatedDDLWorker::initializeLogPointer(const String & processed_entry_name)
{
updateMaxDDLEntryID(processed_entry_name);

View File

@ -33,7 +33,7 @@ public:
bool waitForReplicaToProcessAllEntries(UInt64 timeout_ms);
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed = false); /// NOLINT
DatabaseReplicated * const database, bool committed = false, Coordination::Requests additional_checks = {}); /// NOLINT
UInt32 getLogPointer() const;
@ -43,9 +43,11 @@ private:
void initializeReplication();
void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool dry_run) override;
bool canRemoveQueueEntry(const String & entry_name, const Coordination::Stat & stat) override;
bool checkParentTableExists(const UUID & uuid) const;
DatabaseReplicated * const database;
mutable std::mutex mutex;
std::condition_variable wait_current_task_change;

View File

@ -6,7 +6,7 @@
#include <mutex>
#include <Databases/IDatabase.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <base/types.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <base/types.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Databases/IDatabase.h>
#include <mutex>

View File

@ -46,6 +46,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("enable_zstd_qat_codec", 1);
context->setSetting("allow_create_index_without_type", 1);
context->setSetting("allow_experimental_s3queue", 1);
context->setSetting("allow_experimental_shared_set_join", 1);
}
}

View File

@ -585,7 +585,7 @@ private:
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func) const
{
return const_cast<std::decay_t<decltype(*this)> *>(this)->template getAttributeContainer(attribute_index, std::forward<GetContainerFunc>(func));
return const_cast<std::decay_t<decltype(*this)> *>(this)->getAttributeContainer(attribute_index, std::forward<GetContainerFunc>(func));
}
template<typename ValueType>

View File

@ -11,20 +11,6 @@
using namespace DB;
namespace
{
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB
{
namespace ErrorCodes
@ -35,7 +21,7 @@ namespace ErrorCodes
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withFileCache(settings))
if (!settings.enable_filesystem_cache)
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
@ -45,7 +31,6 @@ size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
@ -54,12 +39,10 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_path_prefix(cache_path_prefix_)
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_file_cache(withFileCache(settings))
, with_page_cache(withPageCache(settings, with_file_cache))
, with_file_cache(settings.enable_filesystem_cache)
, log(getLogger("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
@ -74,47 +57,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
}
current_object = object;
const auto & object_path = object.remote_path;
std::unique_ptr<ReadBufferFromFileBase> buf;
if (with_file_cache)
{
if (settings.remote_fs_cache->isInitialized())
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
}
else
{
settings.remote_fs_cache->throwInitExceptionIfNeeded();
}
}
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}
if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object);
auto buf = read_buffer_creator(/* restricted_seek */true, object);
if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);

View File

@ -26,7 +26,6 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
@ -71,12 +70,10 @@ private:
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::string cache_path_prefix;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
const bool with_file_cache;
const bool with_page_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;

Some files were not shown because too many files have changed in this diff Show More