mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-20 14:42:02 +00:00
Merge branch 'master' into fix-race-condition-between-inserts-and-dropping-mvs
This commit is contained in:
commit
8004953bfa
109
CHANGELOG.md
109
CHANGELOG.md
@ -1,4 +1,5 @@
|
||||
### Table of Contents
|
||||
**[ClickHouse release v22.11, 2022-11-17](#2211)**<br/>
|
||||
**[ClickHouse release v22.10, 2022-10-25](#2210)**<br/>
|
||||
**[ClickHouse release v22.9, 2022-09-22](#229)**<br/>
|
||||
**[ClickHouse release v22.8-lts, 2022-08-18](#228)**<br/>
|
||||
@ -11,6 +12,108 @@
|
||||
**[ClickHouse release v22.1, 2022-01-18](#221)**<br/>
|
||||
**[Changelog for 2021](https://clickhouse.com/docs/en/whats-new/changelog/2021/)**<br/>
|
||||
|
||||
### <a id="2211"></a> ClickHouse release 22.11, 2022-11-17
|
||||
|
||||
#### Backward Incompatible Change
|
||||
* `JSONExtract` family of functions will now attempt to coerce to the requested type. [#41502](https://github.com/ClickHouse/ClickHouse/pull/41502) ([Márcio Martins](https://github.com/marcioapm)).
|
||||
|
||||
#### New Feature
|
||||
* Adds support for retries during INSERTs into ReplicatedMergeTree when a session with ClickHouse Keeper is lost. Apart from fault tolerance, it aims to provide better user experience, - avoid returning a user an error during insert if keeper is restarted (for example, due to upgrade). This is controlled by the `insert_keeper_max_retries` setting, which is disabled by default. [#42607](https://github.com/ClickHouse/ClickHouse/pull/42607) ([Igor Nikonov](https://github.com/devcrafter)).
|
||||
* Add `Hudi` and `DeltaLake` table engines, read-only, only for tables on S3. [#41054](https://github.com/ClickHouse/ClickHouse/pull/41054) ([Daniil Rubin](https://github.com/rubin-do), [Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* Add table function `hudi` and `deltaLake`. [#43080](https://github.com/ClickHouse/ClickHouse/pull/43080) ([flynn](https://github.com/ucasfl)).
|
||||
* Support for composite time intervals. 1. Add, subtract and negate operations are now available on Intervals. In the case where the types of Intervals are different, they will be transformed into the Tuple of those types. 2. A tuple of intervals can be added to or subtracted from a Date/DateTime field. 3. Added parsing of Intervals with different types, for example: `INTERVAL '1 HOUR 1 MINUTE 1 SECOND'`. [#42195](https://github.com/ClickHouse/ClickHouse/pull/42195) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
* Added `**` glob support for recursive directory traversal of the filesystem and S3. Resolves [#36316](https://github.com/ClickHouse/ClickHouse/issues/36316). [#42376](https://github.com/ClickHouse/ClickHouse/pull/42376) ([SmitaRKulkarni](https://github.com/SmitaRKulkarni)).
|
||||
* Introduce `s3_plain` disk type for write-once-read-many operations. Implement `ATTACH` of `MergeTree` table for `s3_plain` disk. [#42628](https://github.com/ClickHouse/ClickHouse/pull/42628) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Added applied row-level policies to `system.query_log`. [#39819](https://github.com/ClickHouse/ClickHouse/pull/39819) ([Vladimir Chebotaryov](https://github.com/quickhouse)).
|
||||
* Add four-letter command `csnp` for manually creating snapshots in ClickHouse Keeper. Additionally, `lgif` was added to get Raft information for a specific node (e.g. index of last created snapshot, last committed log index). [#41766](https://github.com/ClickHouse/ClickHouse/pull/41766) ([JackyWoo](https://github.com/JackyWoo)).
|
||||
* Add function `ascii` like in Apache Spark: https://spark.apache.org/docs/latest/api/sql/#ascii. [#42670](https://github.com/ClickHouse/ClickHouse/pull/42670) ([李扬](https://github.com/taiyang-li)).
|
||||
* Add function `pmod` which returns non-negative result based on modulo. [#42755](https://github.com/ClickHouse/ClickHouse/pull/42755) ([李扬](https://github.com/taiyang-li)).
|
||||
* Add function `formatReadableDecimalSize`. [#42774](https://github.com/ClickHouse/ClickHouse/pull/42774) ([Alejandro](https://github.com/alexon1234)).
|
||||
* Add function `randCanonical`, which is similar to the `rand` function in Apache Spark or Impala. The function generates pseudo random results with independent and identically distributed uniformly distributed values in [0, 1). [#43124](https://github.com/ClickHouse/ClickHouse/pull/43124) ([李扬](https://github.com/taiyang-li)).
|
||||
* Add function `displayName`, closes [#36770](https://github.com/ClickHouse/ClickHouse/issues/36770). [#37681](https://github.com/ClickHouse/ClickHouse/pull/37681) ([hongbin](https://github.com/xlwh)).
|
||||
* Add `min_age_to_force_merge_on_partition_only` setting to optimize old parts for the entire partition only. [#42659](https://github.com/ClickHouse/ClickHouse/pull/42659) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Add generic implementation for arbitrary structured named collections, access type and `system.named_collections`. [#43147](https://github.com/ClickHouse/ClickHouse/pull/43147) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
|
||||
#### Performance Improvement
|
||||
* `match` function can use the index if it's a condition on string prefix. This closes [#37333](https://github.com/ClickHouse/ClickHouse/issues/37333). [#42458](https://github.com/ClickHouse/ClickHouse/pull/42458) ([clarkcaoliu](https://github.com/Clark0)).
|
||||
* Speed up AND and OR operators when they are sequenced. [#42214](https://github.com/ClickHouse/ClickHouse/pull/42214) ([Zhiguo Zhou](https://github.com/ZhiguoZh)).
|
||||
* Support parallel parsing for `LineAsString` input format. This improves performance just slightly. This closes [#42502](https://github.com/ClickHouse/ClickHouse/issues/42502). [#42780](https://github.com/ClickHouse/ClickHouse/pull/42780) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* ClickHouse Keeper performance improvement: improve commit performance for cases when many different nodes have uncommitted states. This should help with cases when a follower node can't sync fast enough. [#42926](https://github.com/ClickHouse/ClickHouse/pull/42926) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* A condition like `NOT LIKE 'prefix%'` can use the primary index. [#42209](https://github.com/ClickHouse/ClickHouse/pull/42209) ([Duc Canh Le](https://github.com/canhld94)).
|
||||
|
||||
#### Experimental Feature
|
||||
* Support type `Object` inside other types, e.g. `Array(JSON)`. [#36969](https://github.com/ClickHouse/ClickHouse/pull/36969) ([Anton Popov](https://github.com/CurtizJ)).
|
||||
* Ignore MySQL binlog SAVEPOINT event for MaterializedMySQL. [#42931](https://github.com/ClickHouse/ClickHouse/pull/42931) ([zzsmdfj](https://github.com/zzsmdfj)). Handle (ignore) SAVEPOINT queries in MaterializedMySQL. [#43086](https://github.com/ClickHouse/ClickHouse/pull/43086) ([Stig Bakken](https://github.com/stigsb)).
|
||||
|
||||
#### Improvement
|
||||
* Trivial queries with small LIMIT will properly determine the number of estimated rows to read, so that the threshold will be checked properly. Closes [#7071](https://github.com/ClickHouse/ClickHouse/issues/7071). [#42580](https://github.com/ClickHouse/ClickHouse/pull/42580) ([Han Fei](https://github.com/hanfei1991)).
|
||||
* Add support for interactive parameters in INSERT VALUES queries. [#43077](https://github.com/ClickHouse/ClickHouse/pull/43077) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
* Added new field `allow_readonly` in `system.table_functions` to allow using table functions in readonly mode. Resolves [#42414](https://github.com/ClickHouse/ClickHouse/issues/42414) Implementation: * Added a new field allow_readonly to table system.table_functions. * Updated to use new field allow_readonly to allow using table functions in readonly mode. Testing: * Added a test for filesystem tests/queries/0_stateless/02473_functions_in_readonly_mode.sh Documentation: * Updated the english documentation for Table Functions. [#42708](https://github.com/ClickHouse/ClickHouse/pull/42708) ([SmitaRKulkarni](https://github.com/SmitaRKulkarni)).
|
||||
* The `system.asynchronous_metrics` gets embedded documentation. This documentation is also exported to Prometheus. Fixed an error with the metrics about `cache` disks - they were calculated only for one arbitrary cache disk instead all of them. This closes [#7644](https://github.com/ClickHouse/ClickHouse/issues/7644). [#43194](https://github.com/ClickHouse/ClickHouse/pull/43194) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Throttling algorithm changed to token bucket. [#42665](https://github.com/ClickHouse/ClickHouse/pull/42665) ([Sergei Trifonov](https://github.com/serxa)).
|
||||
* Mask passwords and secret keys both in `system.query_log` and `/var/log/clickhouse-server/*.log` and also in error messages. [#42484](https://github.com/ClickHouse/ClickHouse/pull/42484) ([Vitaly Baranov](https://github.com/vitlibar)).
|
||||
* Remove covered parts for fetched part (to avoid possible replication delay grows). [#39737](https://github.com/ClickHouse/ClickHouse/pull/39737) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* If `/dev/tty` is available, the progress in clickhouse-client and clickhouse-local will be rendered directly to the terminal, without writing to STDERR. It allows getting progress even if STDERR is redirected to a file, and the file will not be polluted by terminal escape sequences. The progress can be disabled by `--progress false`. This closes [#32238](https://github.com/ClickHouse/ClickHouse/issues/32238). [#42003](https://github.com/ClickHouse/ClickHouse/pull/42003) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Add support for `FixedString` input to base64 coding functions. [#42285](https://github.com/ClickHouse/ClickHouse/pull/42285) ([ltrk2](https://github.com/ltrk2)).
|
||||
* Add columns `bytes_on_disk` and `path` to `system.detached_parts`. Closes [#42264](https://github.com/ClickHouse/ClickHouse/issues/42264). [#42303](https://github.com/ClickHouse/ClickHouse/pull/42303) ([chen](https://github.com/xiedeyantu)).
|
||||
* Improve using structure from insertion table in table functions, now setting `use_structure_from_insertion_table_in_table_functions` has new possible value - `2` that means that ClickHouse will try to determine if we can use structure from insertion table or not automatically. Closes [#40028](https://github.com/ClickHouse/ClickHouse/issues/40028). [#42320](https://github.com/ClickHouse/ClickHouse/pull/42320) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix no progress indication on INSERT FROM INFILE. Closes [#42548](https://github.com/ClickHouse/ClickHouse/issues/42548). [#42634](https://github.com/ClickHouse/ClickHouse/pull/42634) ([chen](https://github.com/xiedeyantu)).
|
||||
* Refactor function `tokens` to enable max tokens returned for related functions (disabled by default). [#42673](https://github.com/ClickHouse/ClickHouse/pull/42673) ([李扬](https://github.com/taiyang-li)).
|
||||
* Allow to use `Date32` arguments for `formatDateTime` and `FROM_UNIXTIME` functions. [#42737](https://github.com/ClickHouse/ClickHouse/pull/42737) ([Roman Vasin](https://github.com/rvasin)).
|
||||
* Update tzdata to 2022f. Mexico will no longer observe DST except near the US border: https://www.timeanddate.com/news/time/mexico-abolishes-dst-2022.html. Chihuahua moves to year-round UTC-6 on 2022-10-30. Fiji no longer observes DST. See https://github.com/google/cctz/pull/235 and https://bugs.launchpad.net/ubuntu/+source/tzdata/+bug/1995209. [#42796](https://github.com/ClickHouse/ClickHouse/pull/42796) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Add `FailedAsyncInsertQuery` event metric for async inserts. [#42814](https://github.com/ClickHouse/ClickHouse/pull/42814) ([Krzysztof Góralski](https://github.com/kgoralski)).
|
||||
* Implement `read-in-order` optimization on top of query plan. It is enabled by default. Set `query_plan_read_in_order = 0` to use previous AST-based version. [#42829](https://github.com/ClickHouse/ClickHouse/pull/42829) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
|
||||
* Increase the size of upload part exponentially for backup to S3 to avoid errors about max 10 000 parts limit of the multipart upload to s3. [#42833](https://github.com/ClickHouse/ClickHouse/pull/42833) ([Vitaly Baranov](https://github.com/vitlibar)).
|
||||
* When the merge task is continuously busy and the disk space is insufficient, the completely expired parts cannot be selected and dropped, resulting in insufficient disk space. My idea is that when the entire Part expires, there is no need for additional disk space to guarantee, ensure the normal execution of TTL. [#42869](https://github.com/ClickHouse/ClickHouse/pull/42869) ([zhongyuankai](https://github.com/zhongyuankai)).
|
||||
* Add `oss` function and `OSS` table engine (this is convenient for users). oss is fully compatible with s3. [#43155](https://github.com/ClickHouse/ClickHouse/pull/43155) ([zzsmdfj](https://github.com/zzsmdfj)).
|
||||
* Improve error reporting in the collection of OS-related info for the `system.asynchronous_metrics` table. [#43192](https://github.com/ClickHouse/ClickHouse/pull/43192) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Modify the `INFORMATION_SCHEMA` tables in a way so that ClickHouse can connect to itself using the MySQL compatibility protocol. Add columns instead of aliases (related to [#9769](https://github.com/ClickHouse/ClickHouse/issues/9769)). It will improve the compatibility with various MySQL clients. [#43198](https://github.com/ClickHouse/ClickHouse/pull/43198) ([Filatenkov Artur](https://github.com/FArthur-cmd)).
|
||||
* Add some functions for compatibility with PowerBI, when it connects using MySQL protocol [#42612](https://github.com/ClickHouse/ClickHouse/pull/42612) ([Filatenkov Artur](https://github.com/FArthur-cmd)).
|
||||
* Better usability for Dashboard on changes [#42872](https://github.com/ClickHouse/ClickHouse/pull/42872) ([Vladimir C](https://github.com/vdimir)).
|
||||
|
||||
#### Build/Testing/Packaging Improvement
|
||||
* Run SQLancer for each pull request and commit to master. [SQLancer](https://github.com/sqlancer/sqlancer) is an OpenSource fuzzer that focuses on automatic detection of logical bugs. [#42397](https://github.com/ClickHouse/ClickHouse/pull/42397) ([Ilya Yatsishin](https://github.com/qoega)).
|
||||
* Update to latest zlib-ng. [#42463](https://github.com/ClickHouse/ClickHouse/pull/42463) ([Boris Kuschel](https://github.com/bkuschel)).
|
||||
* Add support for testing ClickHouse server with Jepsen. By the way, we already have support for testing ClickHouse Keeper with Jepsen. This pull request extends it to Replicated tables. [#42619](https://github.com/ClickHouse/ClickHouse/pull/42619) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Use https://github.com/matus-chochlik/ctcache for clang-tidy results caching. [#42913](https://github.com/ClickHouse/ClickHouse/pull/42913) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
|
||||
* Before the fix, the user-defined config was preserved by RPM in `$file.rpmsave`. The PR fixes it and won't replace the user's files from packages. [#42936](https://github.com/ClickHouse/ClickHouse/pull/42936) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
|
||||
* Remove some libraries from Ubuntu Docker image. [#42622](https://github.com/ClickHouse/ClickHouse/pull/42622) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
|
||||
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
|
||||
|
||||
* Updated normaliser to clone the alias ast. Resolves [#42452](https://github.com/ClickHouse/ClickHouse/issues/42452) Implementation: * Updated QueryNormalizer to clone alias ast, when its replaced. Previously just assigning the same leads to exception in LogicalExpressinsOptimizer as it would be the same parent being inserted again. * This bug is not seen with new analyser (allow_experimental_analyzer), so no changes for it. I added a test for the same. [#42827](https://github.com/ClickHouse/ClickHouse/pull/42827) ([SmitaRKulkarni](https://github.com/SmitaRKulkarni)).
|
||||
* Fix race for backup of tables in `Lazy` databases. [#43104](https://github.com/ClickHouse/ClickHouse/pull/43104) ([Vitaly Baranov](https://github.com/vitlibar)).
|
||||
* Fix for `skip_unavailable_shards`: it did not work with the `s3Cluster` table function. [#43131](https://github.com/ClickHouse/ClickHouse/pull/43131) ([chen](https://github.com/xiedeyantu)).
|
||||
* Fix schema inference in `s3Cluster` and improvement in `hdfsCluster`. [#41979](https://github.com/ClickHouse/ClickHouse/pull/41979) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix retries while reading from URL table engines / table function. (retriable errors could be retries more times than needed, non-retriable errors resulted in failed assertion in code). [#42224](https://github.com/ClickHouse/ClickHouse/pull/42224) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* A segmentation fault related to DNS & c-ares has been reported and fixed. [#42234](https://github.com/ClickHouse/ClickHouse/pull/42234) ([Arthur Passos](https://github.com/arthurpassos)).
|
||||
* Fix `LOGICAL_ERROR` `Arguments of 'plus' have incorrect data types` which may happen in PK analysis (monotonicity check). Fix invalid PK analysis for monotonic binary functions with first constant argument. [#42410](https://github.com/ClickHouse/ClickHouse/pull/42410) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
|
||||
* Fix incorrect key analysis when key types cannot be inside Nullable. This fixes [#42456](https://github.com/ClickHouse/ClickHouse/issues/42456). [#42469](https://github.com/ClickHouse/ClickHouse/pull/42469) ([Amos Bird](https://github.com/amosbird)).
|
||||
* Fix typo in a setting name that led to bad usage of schema inference cache while using setting `input_format_csv_use_best_effort_in_schema_inference`. Closes [#41735](https://github.com/ClickHouse/ClickHouse/issues/41735). [#42536](https://github.com/ClickHouse/ClickHouse/pull/42536) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix creating a Set with wrong header when data type is LowCardinality. Closes [#42460](https://github.com/ClickHouse/ClickHouse/issues/42460). [#42579](https://github.com/ClickHouse/ClickHouse/pull/42579) ([flynn](https://github.com/ucasfl)).
|
||||
* `(U)Int128` and `(U)Int256` values were correctly checked in `PREWHERE`. [#42605](https://github.com/ClickHouse/ClickHouse/pull/42605) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Fix a bug in functions parser that could have led to a segmentation fault. [#42724](https://github.com/ClickHouse/ClickHouse/pull/42724) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
* Fix the locking in `truncate table`. [#42728](https://github.com/ClickHouse/ClickHouse/pull/42728) ([flynn](https://github.com/ucasfl)).
|
||||
* Fix possible crash in `web` disks when file does not exist (or `OPTIMIZE TABLE FINAL`, that also can got the same error eventually). [#42767](https://github.com/ClickHouse/ClickHouse/pull/42767) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Fix `auth_type` mapping in `system.session_log`, by including `SSL_CERTIFICATE` for the enum values. [#42782](https://github.com/ClickHouse/ClickHouse/pull/42782) ([Miel Donkers](https://github.com/mdonkers)).
|
||||
* Fix stack-use-after-return under ASAN build in the Create User query parser. [#42804](https://github.com/ClickHouse/ClickHouse/pull/42804) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
* Fix `lowerUTF8`/`upperUTF8` in case of symbol was in between 16-byte boundary (very frequent case of you have strings > 16 bytes long). [#42812](https://github.com/ClickHouse/ClickHouse/pull/42812) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Additional bound check was added to LZ4 decompression routine to fix misbehaviour in case of malformed input. [#42868](https://github.com/ClickHouse/ClickHouse/pull/42868) ([Nikita Taranov](https://github.com/nickitat)).
|
||||
* Fix rare possible hang on query cancellation. [#42874](https://github.com/ClickHouse/ClickHouse/pull/42874) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Fix incorrect behavior with multiple disjuncts in hash join, close [#42832](https://github.com/ClickHouse/ClickHouse/issues/42832). [#42876](https://github.com/ClickHouse/ClickHouse/pull/42876) ([Vladimir C](https://github.com/vdimir)).
|
||||
* A null pointer will be generated when select if as from ‘three table join’ , For example, this SQL query: [#42883](https://github.com/ClickHouse/ClickHouse/pull/42883) ([zzsmdfj](https://github.com/zzsmdfj)).
|
||||
* Fix memory sanitizer report in Cluster Discovery, close [#42763](https://github.com/ClickHouse/ClickHouse/issues/42763). [#42905](https://github.com/ClickHouse/ClickHouse/pull/42905) ([Vladimir C](https://github.com/vdimir)).
|
||||
* Improve DateTime schema inference in case of empty string. [#42911](https://github.com/ClickHouse/ClickHouse/pull/42911) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix rare NOT_FOUND_COLUMN_IN_BLOCK error when projection is possible to use but there is no projection available. This fixes [#42771](https://github.com/ClickHouse/ClickHouse/issues/42771) . The bug was introduced in https://github.com/ClickHouse/ClickHouse/pull/25563. [#42938](https://github.com/ClickHouse/ClickHouse/pull/42938) ([Amos Bird](https://github.com/amosbird)).
|
||||
* Fix ATTACH TABLE in `PostgreSQL` database engine if the table contains DATETIME data type. Closes [#42817](https://github.com/ClickHouse/ClickHouse/issues/42817). [#42960](https://github.com/ClickHouse/ClickHouse/pull/42960) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* Fix lambda parsing. Closes [#41848](https://github.com/ClickHouse/ClickHouse/issues/41848). [#42979](https://github.com/ClickHouse/ClickHouse/pull/42979) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
* Fix incorrect key analysis when nullable keys appear in the middle of a hyperrectangle. This fixes [#43111](https://github.com/ClickHouse/ClickHouse/issues/43111) . [#43133](https://github.com/ClickHouse/ClickHouse/pull/43133) ([Amos Bird](https://github.com/amosbird)).
|
||||
* Fix several buffer over-reads in deserialization of carefully crafted aggregate function states. [#43159](https://github.com/ClickHouse/ClickHouse/pull/43159) ([Raúl Marín](https://github.com/Algunenano)).
|
||||
* Fix function `if` in case of NULL and const Nullable arguments. Closes [#43069](https://github.com/ClickHouse/ClickHouse/issues/43069). [#43178](https://github.com/ClickHouse/ClickHouse/pull/43178) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix decimal math overflow in parsing DateTime with the 'best effort' algorithm. Closes [#43061](https://github.com/ClickHouse/ClickHouse/issues/43061). [#43180](https://github.com/ClickHouse/ClickHouse/pull/43180) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* The `indent` field produced by the `git-import` tool was miscalculated. See https://clickhouse.com/docs/en/getting-started/example-datasets/github/. [#43191](https://github.com/ClickHouse/ClickHouse/pull/43191) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Fixed unexpected behaviour of `Interval` types with subquery and casting. [#43193](https://github.com/ClickHouse/ClickHouse/pull/43193) ([jh0x](https://github.com/jh0x)).
|
||||
|
||||
### <a id="2210"></a> ClickHouse release 22.10, 2022-10-26
|
||||
|
||||
#### Backward Incompatible Change
|
||||
@ -570,7 +673,7 @@
|
||||
* Support SQL standard CREATE INDEX and DROP INDEX syntax. [#35166](https://github.com/ClickHouse/ClickHouse/pull/35166) ([Jianmei Zhang](https://github.com/zhangjmruc)).
|
||||
* Send profile events for INSERT queries (previously only SELECT was supported). [#37391](https://github.com/ClickHouse/ClickHouse/pull/37391) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Implement in order aggregation (`optimize_aggregation_in_order`) for fully materialized projections. [#37469](https://github.com/ClickHouse/ClickHouse/pull/37469) ([Azat Khuzhin](https://github.com/azat)).
|
||||
* Remove subprocess run for kerberos initialization. Added new integration test. Closes [#27651](https://github.com/ClickHouse/ClickHouse/issues/27651). [#38105](https://github.com/ClickHouse/ClickHouse/pull/38105) ([Roman Vasin](https://github.com/rvasin)).
|
||||
* Remove subprocess run for Kerberos initialization. Added new integration test. Closes [#27651](https://github.com/ClickHouse/ClickHouse/issues/27651). [#38105](https://github.com/ClickHouse/ClickHouse/pull/38105) ([Roman Vasin](https://github.com/rvasin)).
|
||||
* * Add setting `multiple_joins_try_to_keep_original_names` to not rewrite identifier name on multiple JOINs rewrite, close [#34697](https://github.com/ClickHouse/ClickHouse/issues/34697). [#38149](https://github.com/ClickHouse/ClickHouse/pull/38149) ([Vladimir C](https://github.com/vdimir)).
|
||||
* Improved trace-visualizer UX. [#38169](https://github.com/ClickHouse/ClickHouse/pull/38169) ([Sergei Trifonov](https://github.com/serxa)).
|
||||
* Enable stack trace collection and query profiler for AArch64. [#38181](https://github.com/ClickHouse/ClickHouse/pull/38181) ([Maksim Kita](https://github.com/kitaisreal)).
|
||||
@ -850,8 +953,8 @@
|
||||
|
||||
#### Upgrade Notes
|
||||
|
||||
* Now, background merges, mutations and `OPTIMIZE` will not increment `SelectedRows` and `SelectedBytes` metrics. They (still) will increment `MergedRows` and `MergedUncompressedBytes` as it was before. This only affects the metric values, and makes them better. This change does not introduce any incompatibility, but you may wonder about the changes of metrics, so we put in this category. [#37040](https://github.com/ClickHouse/ClickHouse/pull/37040) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
|
||||
* Updated the BoringSSL module to the official FIPS compliant version. This makes ClickHouse FIPS compliant. [#35914](https://github.com/ClickHouse/ClickHouse/pull/35914) ([Meena-Renganathan](https://github.com/Meena-Renganathan)). The ciphers `aes-192-cfb128` and `aes-256-cfb128` were removed, because they are not included in the FIPS certified version of BoringSSL.
|
||||
* Now, background merges, mutations, and `OPTIMIZE` will not increment `SelectedRows` and `SelectedBytes` metrics. They (still) will increment `MergedRows` and `MergedUncompressedBytes` as it was before. This only affects the metric values and makes them better. This change does not introduce any incompatibility, but you may wonder about the changes to the metrics, so we put in this category. [#37040](https://github.com/ClickHouse/ClickHouse/pull/37040) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
|
||||
* Updated the BoringSSL module to the official FIPS compliant version. This makes ClickHouse FIPS compliant in this area. [#35914](https://github.com/ClickHouse/ClickHouse/pull/35914) ([Meena-Renganathan](https://github.com/Meena-Renganathan)). The ciphers `aes-192-cfb128` and `aes-256-cfb128` were removed, because they are not included in the FIPS certified version of BoringSSL.
|
||||
* `max_memory_usage` setting is removed from the default user profile in `users.xml`. This enables flexible memory limits for queries instead of the old rigid limit of 10 GB.
|
||||
* Disable `log_query_threads` setting by default. It controls the logging of statistics about every thread participating in query execution. After supporting asynchronous reads, the total number of distinct thread ids became too large, and logging into the `query_thread_log` has become too heavy. [#37077](https://github.com/ClickHouse/ClickHouse/pull/37077) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Remove function `groupArraySorted` which has a bug. [#36822](https://github.com/ClickHouse/ClickHouse/pull/36822) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
|
@ -127,10 +127,6 @@ The following settings can be set before query execution or placed into configur
|
||||
- `s3_min_upload_part_size` — The minimum size of part to upload during multipart upload to [S3 Multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html). Default value is `512Mb`.
|
||||
- `s3_max_redirects` — Max number of S3 redirects hops allowed. Default value is `10`.
|
||||
- `s3_single_read_retries` — The maximum number of attempts during single read. Default value is `4`.
|
||||
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
|
||||
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
|
||||
|
||||
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.
|
||||
|
||||
@ -146,7 +142,6 @@ The following settings can be specified in configuration file for given endpoint
|
||||
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
|
||||
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
|
||||
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
|
||||
- `max_put_rps`, `max_put_burst`, `max_get_rps` and `max_get_burst` - Throttling settings (see description above) to use for specific endpoint instead of per query. Optional.
|
||||
|
||||
**Example:**
|
||||
|
||||
|
@ -940,10 +940,6 @@ Optional parameters:
|
||||
- `cache_path` — Path on local FS where to store cached mark and index files. Default value is `/var/lib/clickhouse/disks/<disk_name>/cache/`.
|
||||
- `skip_access_check` — If true, disk access checks will not be performed on disk start-up. Default value is `false`.
|
||||
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set.
|
||||
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
|
||||
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
|
||||
|
||||
S3 disk can be configured as `main` or `cold` storage:
|
||||
``` xml
|
||||
|
@ -6,7 +6,7 @@ sidebar_label: Date32
|
||||
|
||||
# Date32
|
||||
|
||||
A date. Supports the date range same with [DateTime64](../../sql-reference/data-types/datetime64.md). Stored in four bytes as the number of days since 1900-01-01. Allows storing values till 2299-12-31.
|
||||
A date. Supports the date range same with [DateTime64](../../sql-reference/data-types/datetime64.md). Stored as a signed 32-bit integer in native byte order with the value representing the days since 1970-01-01 (0 represents 1970-01-01 and negative values represent the days before 1970).
|
||||
|
||||
**Examples**
|
||||
|
||||
|
@ -1517,6 +1517,7 @@ void QueryAnalyzer::collectScopeValidIdentifiersForTypoCorrection(
|
||||
{
|
||||
for (const auto & [name, expression] : scope.alias_name_to_expression_node)
|
||||
{
|
||||
assert(expression);
|
||||
auto expression_identifier = Identifier(name);
|
||||
valid_identifiers_result.insert(expression_identifier);
|
||||
|
||||
@ -2170,6 +2171,18 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromAliases(const Identifier
|
||||
auto & alias_identifier_node = it->second->as<IdentifierNode &>();
|
||||
auto identifier = alias_identifier_node.getIdentifier();
|
||||
auto lookup_result = tryResolveIdentifier(IdentifierLookup{identifier, identifier_lookup.lookup_context}, scope, identifier_resolve_settings);
|
||||
if (!lookup_result.isResolved())
|
||||
{
|
||||
std::unordered_set<Identifier> valid_identifiers;
|
||||
collectScopeWithParentScopesValidIdentifiersForTypoCorrection(identifier, scope, true, false, false, valid_identifiers);
|
||||
|
||||
auto hints = collectIdentifierTypoHints(identifier, valid_identifiers);
|
||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown {} identifier '{}' in scope {}{}",
|
||||
toStringLowercase(IdentifierLookupContext::EXPRESSION),
|
||||
identifier.getFullName(),
|
||||
scope.scope_node->formatASTForErrorMessage(),
|
||||
getHintsErrorMessageSuffix(hints));
|
||||
}
|
||||
it->second = lookup_result.resolved_identifier;
|
||||
|
||||
/** During collection of aliases if node is identifier and has alias, we cannot say if it is
|
||||
|
@ -46,7 +46,7 @@ namespace
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {});
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = s3_uri.endpoint;
|
||||
client_configuration.maxConnections = static_cast<unsigned>(context->getSettingsRef().s3_max_connections);
|
||||
@ -86,10 +86,9 @@ BackupReaderS3::BackupReaderS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)
|
||||
: s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
|
||||
, read_settings(context_->getReadSettings())
|
||||
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
|
||||
{
|
||||
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
|
||||
}
|
||||
|
||||
DataSourceDescription BackupReaderS3::getDataSourceDescription() const
|
||||
@ -116,7 +115,7 @@ UInt64 BackupReaderS3::getFileSize(const String & file_name)
|
||||
std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file_name)
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
|
||||
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, max_single_read_retries, read_settings);
|
||||
}
|
||||
|
||||
|
||||
@ -124,12 +123,12 @@ BackupWriterS3::BackupWriterS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)
|
||||
: s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
|
||||
, read_settings(context_->getReadSettings())
|
||||
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
|
||||
, rw_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).rw_settings)
|
||||
, log(&Poco::Logger::get("BackupWriterS3"))
|
||||
{
|
||||
request_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
|
||||
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
|
||||
rw_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
|
||||
}
|
||||
|
||||
DataSourceDescription BackupWriterS3::getDataSourceDescription() const
|
||||
@ -217,7 +216,7 @@ void BackupWriterS3::copyObjectMultipartImpl(
|
||||
std::vector<String> part_tags;
|
||||
|
||||
size_t position = 0;
|
||||
size_t upload_part_size = request_settings.min_upload_part_size;
|
||||
size_t upload_part_size = rw_settings.min_upload_part_size;
|
||||
|
||||
for (size_t part_number = 1; position < size; ++part_number)
|
||||
{
|
||||
@ -249,10 +248,10 @@ void BackupWriterS3::copyObjectMultipartImpl(
|
||||
|
||||
position = next_position;
|
||||
|
||||
if (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
if (part_number % rw_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
{
|
||||
upload_part_size *= request_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
|
||||
upload_part_size *= rw_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, rw_settings.max_upload_part_size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -295,7 +294,7 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
|
||||
auto file_path = fs::path(s3_uri.key) / file_name_to;
|
||||
|
||||
auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult();
|
||||
if (static_cast<size_t>(head.GetContentLength()) < request_settings.max_single_operation_copy_size)
|
||||
if (static_cast<size_t>(head.GetContentLength()) < rw_settings.max_single_operation_copy_size)
|
||||
{
|
||||
copyObjectImpl(
|
||||
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
|
||||
@ -332,7 +331,7 @@ bool BackupWriterS3::fileContentsEqual(const String & file_name, const String &
|
||||
try
|
||||
{
|
||||
auto in = std::make_unique<ReadBufferFromS3>(
|
||||
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
|
||||
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, max_single_read_retries, read_settings);
|
||||
String actual_file_contents(expected_file_contents.size(), ' ');
|
||||
return (in->read(actual_file_contents.data(), actual_file_contents.size()) == actual_file_contents.size())
|
||||
&& (actual_file_contents == expected_file_contents) && in->eof();
|
||||
@ -350,7 +349,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
|
||||
client,
|
||||
s3_uri.bucket,
|
||||
fs::path(s3_uri.key) / file_name,
|
||||
request_settings,
|
||||
rw_settings,
|
||||
std::nullopt,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
|
||||
|
@ -39,8 +39,8 @@ public:
|
||||
private:
|
||||
S3::URI s3_uri;
|
||||
std::shared_ptr<Aws::S3::S3Client> client;
|
||||
UInt64 max_single_read_retries;
|
||||
ReadSettings read_settings;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
};
|
||||
|
||||
|
||||
@ -81,8 +81,9 @@ private:
|
||||
|
||||
S3::URI s3_uri;
|
||||
std::shared_ptr<Aws::S3::S3Client> client;
|
||||
UInt64 max_single_read_retries;
|
||||
ReadSettings read_settings;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
|
@ -62,7 +62,7 @@
|
||||
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
|
||||
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_network_bandwidth' and other throttling settings.") \
|
||||
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform the 'max_network_bandwidth' setting.") \
|
||||
\
|
||||
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
|
||||
\
|
||||
|
@ -20,6 +20,8 @@ namespace ErrorCodes
|
||||
/// Just 10^9.
|
||||
static constexpr auto NS = 1000000000UL;
|
||||
|
||||
static const size_t default_burst_seconds = 1;
|
||||
|
||||
Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
|
||||
: max_speed(max_speed_)
|
||||
, max_burst(max_speed_ * default_burst_seconds)
|
||||
|
@ -17,8 +17,6 @@ namespace DB
|
||||
class Throttler
|
||||
{
|
||||
public:
|
||||
static const size_t default_burst_seconds = 1;
|
||||
|
||||
Throttler(size_t max_speed_, size_t max_burst_, const std::shared_ptr<Throttler> & parent_ = nullptr)
|
||||
: max_speed(max_speed_), max_burst(max_burst_), limit_exceeded_exception_message(""), tokens(max_burst), parent(parent_) {}
|
||||
|
||||
|
@ -93,7 +93,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
auth_settings.region,
|
||||
RemoteHostFilter(), s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {});
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = new_uri.endpoint;
|
||||
|
||||
@ -135,8 +135,8 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
|
||||
if (s3_client == nullptr)
|
||||
return;
|
||||
|
||||
S3Settings::RequestSettings request_settings_1;
|
||||
request_settings_1.upload_part_size_multiply_parts_count_threshold = 10000;
|
||||
S3Settings::ReadWriteSettings read_write_settings;
|
||||
read_write_settings.upload_part_size_multiply_parts_count_threshold = 10000;
|
||||
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
@ -145,7 +145,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
key,
|
||||
request_settings_1
|
||||
read_write_settings
|
||||
};
|
||||
};
|
||||
|
||||
@ -194,15 +194,13 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
|
||||
lock_writer.finalize();
|
||||
|
||||
// We read back the written UUID, if it's the same we can upload the file
|
||||
S3Settings::RequestSettings request_settings_2;
|
||||
request_settings_2.max_single_read_retries = 1;
|
||||
ReadBufferFromS3 lock_reader
|
||||
{
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
lock_file,
|
||||
"",
|
||||
request_settings_2,
|
||||
1,
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -90,10 +90,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
|
||||
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
|
||||
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
|
||||
M(UInt64, s3_max_get_rps, 0, "Limit on S3 GET request per second rate before throttling. Zero means unlimited.", 0) \
|
||||
M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \
|
||||
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
|
||||
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
|
||||
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
|
||||
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
|
||||
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
|
||||
|
@ -175,7 +175,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
bucket,
|
||||
path,
|
||||
version_id,
|
||||
settings_ptr->request_settings,
|
||||
settings_ptr->s3_settings.max_single_read_retries,
|
||||
disk_read_settings,
|
||||
/* use_external_buffer */true,
|
||||
/* offset */0,
|
||||
@ -212,7 +212,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
|
||||
bucket,
|
||||
object.absolute_path,
|
||||
version_id,
|
||||
settings_ptr->request_settings,
|
||||
settings_ptr->s3_settings.max_single_read_retries,
|
||||
patchSettings(read_settings));
|
||||
}
|
||||
|
||||
@ -238,7 +238,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
client.get(),
|
||||
bucket,
|
||||
object.absolute_path,
|
||||
settings_ptr->request_settings,
|
||||
settings_ptr->s3_settings,
|
||||
attributes,
|
||||
buf_size,
|
||||
std::move(scheduler),
|
||||
@ -489,7 +489,7 @@ void S3ObjectStorage::copyObjectImpl(
|
||||
throwIfError(outcome);
|
||||
|
||||
auto settings_ptr = s3_settings.get();
|
||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||
if (settings_ptr->s3_settings.check_objects_after_upload)
|
||||
{
|
||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
||||
if (!object_head.IsSuccess())
|
||||
@ -533,7 +533,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
|
||||
std::vector<String> part_tags;
|
||||
|
||||
size_t upload_part_size = settings_ptr->request_settings.min_upload_part_size;
|
||||
size_t upload_part_size = settings_ptr->s3_settings.min_upload_part_size;
|
||||
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
|
||||
@ -586,7 +586,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||
if (settings_ptr->s3_settings.check_objects_after_upload)
|
||||
{
|
||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
||||
if (!object_head.IsSuccess())
|
||||
@ -643,20 +643,17 @@ void S3ObjectStorage::startup()
|
||||
|
||||
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context);
|
||||
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
|
||||
s3_settings.set(std::move(new_s3_settings));
|
||||
client.set(std::move(new_client));
|
||||
s3_settings.set(getSettings(config, config_prefix, context));
|
||||
client.set(getClient(config, config_prefix, context));
|
||||
applyRemoteThrottlingSettings(context);
|
||||
}
|
||||
|
||||
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context);
|
||||
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
|
||||
return std::make_unique<S3ObjectStorage>(
|
||||
std::move(new_client), std::move(new_s3_settings),
|
||||
getClient(config, config_prefix, context),
|
||||
getSettings(config, config_prefix, context),
|
||||
version_id, s3_capabilities, new_namespace,
|
||||
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
|
||||
}
|
||||
|
@ -23,17 +23,17 @@ struct S3ObjectStorageSettings
|
||||
S3ObjectStorageSettings() = default;
|
||||
|
||||
S3ObjectStorageSettings(
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const S3Settings::ReadWriteSettings & s3_settings_,
|
||||
uint64_t min_bytes_for_seek_,
|
||||
int32_t list_object_keys_size_,
|
||||
int32_t objects_chunk_size_to_delete_)
|
||||
: request_settings(request_settings_)
|
||||
: s3_settings(s3_settings_)
|
||||
, min_bytes_for_seek(min_bytes_for_seek_)
|
||||
, list_object_keys_size(list_object_keys_size_)
|
||||
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
|
||||
{}
|
||||
|
||||
S3Settings::RequestSettings request_settings;
|
||||
S3Settings::ReadWriteSettings s3_settings;
|
||||
|
||||
uint64_t min_bytes_for_seek;
|
||||
int32_t list_object_keys_size;
|
||||
|
@ -4,7 +4,6 @@
|
||||
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
@ -33,26 +32,17 @@ namespace ErrorCodes
|
||||
|
||||
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
|
||||
{
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", settings.s3_max_single_read_retries);
|
||||
request_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", settings.s3_min_upload_part_size);
|
||||
request_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", settings.s3_upload_part_size_multiply_factor);
|
||||
request_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
request_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", settings.s3_max_single_part_upload_size);
|
||||
request_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", settings.s3_check_objects_after_upload);
|
||||
request_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", settings.s3_max_unexpected_write_error_retries);
|
||||
|
||||
// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
|
||||
if (UInt64 max_get_rps = config.getUInt64(config_prefix + ".s3_max_get_rps", settings.s3_max_get_rps))
|
||||
request_settings.get_request_throttler = std::make_shared<Throttler>(
|
||||
max_get_rps, config.getUInt64(config_prefix + ".s3_max_get_burst", settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * max_get_rps));
|
||||
if (UInt64 max_put_rps = config.getUInt64(config_prefix + ".s3_max_put_rps", settings.s3_max_put_rps))
|
||||
request_settings.put_request_throttler = std::make_shared<Throttler>(
|
||||
max_put_rps, config.getUInt64(config_prefix + ".s3_max_put_burst", settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * max_put_rps));
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
rw_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries);
|
||||
rw_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size);
|
||||
rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor);
|
||||
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
|
||||
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
|
||||
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
|
||||
rw_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", context->getSettingsRef().s3_max_unexpected_write_error_retries);
|
||||
|
||||
return std::make_unique<S3ObjectStorageSettings>(
|
||||
request_settings,
|
||||
rw_settings,
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getInt(config_prefix + ".list_object_keys_size", 1000),
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
|
||||
@ -122,20 +112,14 @@ std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & pre
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<Aws::S3::S3Client> getClient(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
ContextPtr context,
|
||||
const S3ObjectStorageSettings & settings)
|
||||
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
|
||||
{
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
config.getString(config_prefix + ".region", ""),
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ true,
|
||||
settings.request_settings.get_request_throttler,
|
||||
settings.request_settings.put_request_throttler);
|
||||
/* for_disk_s3 = */ true);
|
||||
|
||||
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
|
||||
if (uri.key.back() != '/')
|
||||
|
@ -22,7 +22,7 @@ struct S3ObjectStorageSettings;
|
||||
|
||||
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
|
||||
|
||||
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
|
||||
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
|
||||
|
||||
}
|
||||
|
||||
|
27
src/Disks/ObjectStorages/S3/parseConfig.h
Normal file
27
src/Disks/ObjectStorages/S3/parseConfig.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <aws/core/client/DefaultRetryStrategy.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Disks/ObjectStorages/S3/ProxyConfiguration.h>
|
||||
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
|
||||
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
|
||||
#include <Disks/DiskRestartProxy.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
|
||||
|
||||
std::shared_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
|
||||
|
||||
|
||||
}
|
@ -130,16 +130,21 @@ void registerDiskS3(DiskFactory & factory)
|
||||
chassert(type == "s3" || type == "s3_plain");
|
||||
|
||||
MetadataStoragePtr metadata_storage;
|
||||
auto settings = getSettings(config, config_prefix, context);
|
||||
auto client = getClient(config, config_prefix, context, *settings);
|
||||
if (type == "s3_plain")
|
||||
{
|
||||
s3_storage = std::make_shared<S3PlainObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
||||
s3_storage = std::make_shared<S3PlainObjectStorage>(
|
||||
getClient(config, config_prefix, context),
|
||||
getSettings(config, config_prefix, context),
|
||||
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
||||
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
|
||||
}
|
||||
else
|
||||
{
|
||||
s3_storage = std::make_shared<S3ObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
||||
s3_storage = std::make_shared<S3ObjectStorage>(
|
||||
getClient(config, config_prefix, context),
|
||||
getSettings(config, config_prefix, context),
|
||||
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
||||
|
||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const String & version_id_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
UInt64 max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool use_external_buffer_,
|
||||
size_t offset_,
|
||||
@ -56,7 +56,7 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
, version_id(version_id_)
|
||||
, request_settings(request_settings_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, offset(offset_)
|
||||
, read_until_position(read_until_position_)
|
||||
, read_settings(settings_)
|
||||
@ -105,7 +105,7 @@ bool ReadBufferFromS3::nextImpl()
|
||||
}
|
||||
|
||||
size_t sleep_time_with_backoff_milliseconds = 100;
|
||||
for (size_t attempt = 0; attempt < request_settings.max_single_read_retries && !next_result; ++attempt)
|
||||
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
|
||||
{
|
||||
Stopwatch watch;
|
||||
try
|
||||
@ -166,7 +166,7 @@ bool ReadBufferFromS3::nextImpl()
|
||||
attempt,
|
||||
e.message());
|
||||
|
||||
if (attempt + 1 == request_settings.max_single_read_retries)
|
||||
if (attempt + 1 == max_single_read_retries)
|
||||
throw;
|
||||
|
||||
/// Pause before next attempt.
|
||||
@ -349,7 +349,7 @@ SeekableReadBufferPtr ReadBufferS3Factory::getReader()
|
||||
bucket,
|
||||
key,
|
||||
version_id,
|
||||
request_settings,
|
||||
s3_max_single_read_retries,
|
||||
read_settings,
|
||||
false /*use_external_buffer*/,
|
||||
next_range->first,
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/RangeGenerator.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
@ -34,7 +33,7 @@ private:
|
||||
String bucket;
|
||||
String key;
|
||||
String version_id;
|
||||
const S3Settings::RequestSettings request_settings;
|
||||
UInt64 max_single_read_retries;
|
||||
|
||||
/// These variables are atomic because they can be used for `logging only`
|
||||
/// (where it is not important to get consistent result)
|
||||
@ -53,7 +52,7 @@ public:
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const String & version_id_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
UInt64 max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool use_external_buffer = false,
|
||||
size_t offset_ = 0,
|
||||
@ -101,7 +100,7 @@ public:
|
||||
const String & version_id_,
|
||||
size_t range_step_,
|
||||
size_t object_size_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
UInt64 s3_max_single_read_retries_,
|
||||
const ReadSettings & read_settings_)
|
||||
: client_ptr(client_ptr_)
|
||||
, bucket(bucket_)
|
||||
@ -111,7 +110,7 @@ public:
|
||||
, range_generator(object_size_, range_step_)
|
||||
, range_step(range_step_)
|
||||
, object_size(object_size_)
|
||||
, request_settings(request_settings_)
|
||||
, s3_max_single_read_retries(s3_max_single_read_retries_)
|
||||
{
|
||||
assert(range_step > 0);
|
||||
assert(range_step < object_size);
|
||||
@ -136,7 +135,7 @@ private:
|
||||
size_t range_step;
|
||||
size_t object_size;
|
||||
|
||||
const S3Settings::RequestSettings request_settings;
|
||||
UInt64 s3_max_single_read_retries;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -11,7 +11,6 @@
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
@ -77,16 +76,12 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||
const RemoteHostFilter & remote_host_filter_,
|
||||
unsigned int s3_max_redirects_,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
const ThrottlerPtr & put_request_throttler_)
|
||||
bool for_disk_s3_)
|
||||
: force_region(force_region_)
|
||||
, remote_host_filter(remote_host_filter_)
|
||||
, s3_max_redirects(s3_max_redirects_)
|
||||
, enable_s3_requests_logging(enable_s3_requests_logging_)
|
||||
, for_disk_s3(for_disk_s3_)
|
||||
, get_request_throttler(get_request_throttler_)
|
||||
, put_request_throttler(put_request_throttler_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -133,8 +128,6 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
|
||||
, s3_max_redirects(client_configuration.s3_max_redirects)
|
||||
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
|
||||
, for_disk_s3(client_configuration.for_disk_s3)
|
||||
, get_request_throttler(client_configuration.get_request_throttler)
|
||||
, put_request_throttler(client_configuration.put_request_throttler)
|
||||
, extra_headers(client_configuration.extra_headers)
|
||||
{
|
||||
}
|
||||
@ -252,23 +245,6 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
if (enable_s3_requests_logging)
|
||||
LOG_TEST(log, "Make request to: {}", uri);
|
||||
|
||||
switch (request.GetMethod())
|
||||
{
|
||||
case Aws::Http::HttpMethod::HTTP_GET:
|
||||
case Aws::Http::HttpMethod::HTTP_HEAD:
|
||||
if (get_request_throttler)
|
||||
get_request_throttler->add(1);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_PUT:
|
||||
case Aws::Http::HttpMethod::HTTP_POST:
|
||||
case Aws::Http::HttpMethod::HTTP_PATCH:
|
||||
if (put_request_throttler)
|
||||
put_request_throttler->add(1);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_DELETE:
|
||||
break; // Not throttled
|
||||
}
|
||||
|
||||
addMetric(request, S3MetricType::Count);
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::S3Requests};
|
||||
|
||||
|
@ -8,7 +8,6 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/S3/SessionAwareIOStream.h>
|
||||
@ -49,8 +48,6 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
unsigned int s3_max_redirects;
|
||||
bool enable_s3_requests_logging;
|
||||
bool for_disk_s3;
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
HeaderCollection extra_headers;
|
||||
|
||||
void updateSchemeAndRegion();
|
||||
@ -63,9 +60,7 @@ private:
|
||||
const RemoteHostFilter & remote_host_filter_,
|
||||
unsigned int s3_max_redirects_,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
const ThrottlerPtr & put_request_throttler_
|
||||
bool for_disk_s3_
|
||||
);
|
||||
|
||||
/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
|
||||
@ -159,16 +154,6 @@ private:
|
||||
unsigned int s3_max_redirects;
|
||||
bool enable_s3_requests_logging;
|
||||
bool for_disk_s3;
|
||||
|
||||
/// Limits get request per second rate for GET, SELECT and all other requests, excluding throttled by put throttler
|
||||
/// (i.e. throttles GetObject, HeadObject)
|
||||
ThrottlerPtr get_request_throttler;
|
||||
|
||||
/// Limits put request per second rate for PUT, COPY, POST, LIST requests
|
||||
/// (i.e. throttles PutObject, CopyObject, ListObjects, CreateMultipartUpload, UploadPartCopy, UploadPart, CompleteMultipartUpload)
|
||||
/// NOTE: DELETE and CANCEL requests are not throttled by either put or get throttler
|
||||
ThrottlerPtr put_request_throttler;
|
||||
|
||||
const HeaderCollection extra_headers;
|
||||
};
|
||||
|
||||
|
@ -88,9 +88,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
/* get_request_throttler = */ {},
|
||||
/* put_request_throttler = */ {}
|
||||
/* for_disk_s3 = */ false
|
||||
);
|
||||
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
@ -115,14 +113,12 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
|
||||
ASSERT_TRUE(client);
|
||||
|
||||
DB::ReadSettings read_settings;
|
||||
DB::S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = max_single_read_retries;
|
||||
DB::ReadBufferFromS3 read_buffer(
|
||||
client,
|
||||
uri.bucket,
|
||||
uri.key,
|
||||
version_id,
|
||||
request_settings,
|
||||
max_single_read_retries,
|
||||
read_settings
|
||||
);
|
||||
|
||||
|
@ -573,14 +573,7 @@ public:
|
||||
/// AWS API tries credentials providers one by one. Some of providers (like ProfileConfigFileAWSCredentialsProvider) can be
|
||||
/// quite verbose even if nobody configured them. So we use our provider first and only after it use default providers.
|
||||
{
|
||||
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
|
||||
configuration.region,
|
||||
configuration.remote_host_filter,
|
||||
configuration.s3_max_redirects,
|
||||
configuration.enable_s3_requests_logging,
|
||||
configuration.for_disk_s3,
|
||||
configuration.get_request_throttler,
|
||||
configuration.put_request_throttler);
|
||||
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.region, configuration.remote_host_filter, configuration.s3_max_redirects, configuration.enable_s3_requests_logging, configuration.for_disk_s3);
|
||||
AddProvider(std::make_shared<AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider>(aws_client_configuration));
|
||||
}
|
||||
|
||||
@ -617,14 +610,7 @@ public:
|
||||
}
|
||||
else if (Aws::Utils::StringUtils::ToLower(ec2_metadata_disabled.c_str()) != "true")
|
||||
{
|
||||
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
|
||||
configuration.region,
|
||||
configuration.remote_host_filter,
|
||||
configuration.s3_max_redirects,
|
||||
configuration.enable_s3_requests_logging,
|
||||
configuration.for_disk_s3,
|
||||
configuration.get_request_throttler,
|
||||
configuration.put_request_throttler);
|
||||
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.region, configuration.remote_host_filter, configuration.s3_max_redirects, configuration.enable_s3_requests_logging, configuration.for_disk_s3);
|
||||
|
||||
/// See MakeDefaultHttpResourceClientConfiguration().
|
||||
/// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside
|
||||
@ -745,18 +731,9 @@ namespace S3
|
||||
const RemoteHostFilter & remote_host_filter,
|
||||
unsigned int s3_max_redirects,
|
||||
bool enable_s3_requests_logging,
|
||||
bool for_disk_s3,
|
||||
const ThrottlerPtr & get_request_throttler,
|
||||
const ThrottlerPtr & put_request_throttler)
|
||||
bool for_disk_s3)
|
||||
{
|
||||
return PocoHTTPClientConfiguration(
|
||||
force_region,
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
for_disk_s3,
|
||||
get_request_throttler,
|
||||
put_request_throttler);
|
||||
return PocoHTTPClientConfiguration(force_region, remote_host_filter, s3_max_redirects, enable_s3_requests_logging, for_disk_s3);
|
||||
}
|
||||
|
||||
URI::URI(const Poco::URI & uri_)
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include <Poco/URI.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
|
||||
namespace Aws::S3
|
||||
{
|
||||
@ -89,9 +88,7 @@ public:
|
||||
const RemoteHostFilter & remote_host_filter,
|
||||
unsigned int s3_max_redirects,
|
||||
bool enable_s3_requests_logging,
|
||||
bool for_disk_s3,
|
||||
const ThrottlerPtr & get_request_throttler,
|
||||
const ThrottlerPtr & put_request_throttler);
|
||||
bool for_disk_s3);
|
||||
|
||||
private:
|
||||
ClientFactory();
|
||||
|
@ -71,7 +71,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const S3Settings::ReadWriteSettings & s3_settings_,
|
||||
std::optional<std::map<String, String>> object_metadata_,
|
||||
size_t buffer_size_,
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
@ -79,10 +79,10 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
, request_settings(request_settings_)
|
||||
, s3_settings(s3_settings_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, object_metadata(std::move(object_metadata_))
|
||||
, upload_part_size(request_settings_.min_upload_part_size)
|
||||
, upload_part_size(s3_settings_.min_upload_part_size)
|
||||
, schedule(std::move(schedule_))
|
||||
, write_settings(write_settings_)
|
||||
{
|
||||
@ -107,7 +107,7 @@ void WriteBufferFromS3::nextImpl()
|
||||
write_settings.remote_throttler->add(offset());
|
||||
|
||||
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
|
||||
if (multipart_upload_id.empty() && last_part_size > request_settings.max_single_part_upload_size)
|
||||
if (multipart_upload_id.empty() && last_part_size > s3_settings.max_single_part_upload_size)
|
||||
createMultipartUpload();
|
||||
|
||||
if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
|
||||
@ -122,10 +122,10 @@ void WriteBufferFromS3::nextImpl()
|
||||
|
||||
void WriteBufferFromS3::allocateBuffer()
|
||||
{
|
||||
if (total_parts_uploaded != 0 && total_parts_uploaded % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
if (total_parts_uploaded != 0 && total_parts_uploaded % s3_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
{
|
||||
upload_part_size *= request_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
|
||||
upload_part_size *= s3_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, s3_settings.max_upload_part_size);
|
||||
}
|
||||
|
||||
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
|
||||
@ -180,7 +180,7 @@ void WriteBufferFromS3::finalizeImpl()
|
||||
if (!multipart_upload_id.empty())
|
||||
completeMultipartUpload();
|
||||
|
||||
if (request_settings.check_objects_after_upload)
|
||||
if (s3_settings.check_objects_after_upload)
|
||||
{
|
||||
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
||||
|
||||
@ -370,7 +370,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
@ -476,7 +476,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
{
|
||||
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const S3Settings::ReadWriteSettings & s3_settings_,
|
||||
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||
@ -88,7 +88,7 @@ private:
|
||||
|
||||
const String bucket;
|
||||
const String key;
|
||||
const S3Settings::RequestSettings request_settings;
|
||||
const S3Settings::ReadWriteSettings s3_settings;
|
||||
const std::shared_ptr<const Aws::S3::S3Client> client_ptr;
|
||||
const std::optional<std::map<String, String>> object_metadata;
|
||||
|
||||
|
@ -498,17 +498,6 @@ void Planner::buildQueryPlanIfNeeded()
|
||||
should_produce_results_in_order_of_bucket_number);
|
||||
query_plan.addStep(std::move(aggregating_step));
|
||||
|
||||
if (query_node.isGroupByWithRollup())
|
||||
{
|
||||
auto rollup_step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
|
||||
query_plan.addStep(std::move(rollup_step));
|
||||
}
|
||||
else if (query_node.isGroupByWithCube())
|
||||
{
|
||||
auto cube_step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
|
||||
query_plan.addStep(std::move(cube_step));
|
||||
}
|
||||
|
||||
if (query_node.isGroupByWithTotals())
|
||||
{
|
||||
const auto & having_analysis_result = expression_analysis_result.getHaving();
|
||||
@ -528,6 +517,17 @@ void Planner::buildQueryPlanIfNeeded()
|
||||
|
||||
query_plan.addStep(std::move(totals_having_step));
|
||||
}
|
||||
|
||||
if (query_node.isGroupByWithRollup())
|
||||
{
|
||||
auto rollup_step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
|
||||
query_plan.addStep(std::move(rollup_step));
|
||||
}
|
||||
else if (query_node.isGroupByWithCube())
|
||||
{
|
||||
auto cube_step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
|
||||
query_plan.addStep(std::move(cube_step));
|
||||
}
|
||||
}
|
||||
|
||||
if (!having_executed && expression_analysis_result.hasHaving())
|
||||
|
@ -118,7 +118,7 @@ struct URLBasedDataSourceConfiguration
|
||||
struct StorageS3Configuration : URLBasedDataSourceConfiguration
|
||||
{
|
||||
S3::AuthSettings auth_settings;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
};
|
||||
|
||||
|
||||
|
@ -151,14 +151,12 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
|
||||
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
|
||||
{
|
||||
/// TODO: add parallel downloads
|
||||
S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = 10;
|
||||
return std::make_shared<ReadBufferFromS3>(
|
||||
base_configuration.client,
|
||||
base_configuration.uri.bucket,
|
||||
key,
|
||||
base_configuration.uri.version_id,
|
||||
request_settings,
|
||||
/* max single read retries */10,
|
||||
context->getReadSettings());
|
||||
}
|
||||
|
||||
@ -189,7 +187,7 @@ StorageDelta::StorageDelta(
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
|
@ -37,7 +37,7 @@ StorageHudi::StorageHudi(
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
|
@ -100,8 +100,7 @@ public:
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
|
||||
Strings * read_keys_,
|
||||
const S3Settings::RequestSettings & request_settings_)
|
||||
Strings * read_keys_)
|
||||
: WithContext(context_)
|
||||
, client(client_)
|
||||
, globbed_uri(globbed_uri_)
|
||||
@ -109,7 +108,6 @@ public:
|
||||
, virtual_header(virtual_header_)
|
||||
, object_infos(object_infos_)
|
||||
, read_keys(read_keys_)
|
||||
, request_settings(request_settings_)
|
||||
{
|
||||
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
|
||||
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
|
||||
@ -260,7 +258,6 @@ private:
|
||||
bool is_finished{false};
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos;
|
||||
Strings * read_keys;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
};
|
||||
|
||||
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
|
||||
@ -270,9 +267,8 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
|
||||
Strings * read_keys_,
|
||||
const S3Settings::RequestSettings & request_settings_)
|
||||
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_, request_settings_))
|
||||
Strings * read_keys_)
|
||||
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -385,7 +381,7 @@ StorageS3Source::StorageS3Source(
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
UInt64 max_block_size_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
UInt64 max_single_read_retries_,
|
||||
String compression_hint_,
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & client_,
|
||||
const String & bucket_,
|
||||
@ -401,7 +397,7 @@ StorageS3Source::StorageS3Source(
|
||||
, format(format_)
|
||||
, columns_desc(columns_)
|
||||
, max_block_size(max_block_size_)
|
||||
, request_settings(request_settings_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, compression_hint(std::move(compression_hint_))
|
||||
, client(client_)
|
||||
, sample_block(sample_block_)
|
||||
@ -467,7 +463,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
|
||||
if (!use_parallel_download || object_too_small)
|
||||
{
|
||||
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
|
||||
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, request_settings, getContext()->getReadSettings());
|
||||
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, max_single_read_retries, getContext()->getReadSettings());
|
||||
}
|
||||
|
||||
assert(object_size > 0);
|
||||
@ -479,7 +475,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
|
||||
}
|
||||
|
||||
auto factory = std::make_unique<ReadBufferS3Factory>(
|
||||
client, bucket, key, version_id, download_buffer_size, object_size, request_settings, getContext()->getReadSettings());
|
||||
client, bucket, key, version_id, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
|
||||
LOG_TRACE(
|
||||
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
|
||||
|
||||
@ -589,7 +585,7 @@ public:
|
||||
s3_configuration_.client,
|
||||
bucket,
|
||||
key,
|
||||
s3_configuration_.request_settings,
|
||||
s3_configuration_.rw_settings,
|
||||
std::nullopt,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelRead"),
|
||||
@ -753,7 +749,7 @@ StorageS3::StorageS3(
|
||||
bool distributed_processing_,
|
||||
ASTPtr partition_by_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, keys({s3_configuration.uri.key})
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
@ -819,7 +815,7 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
|
||||
{
|
||||
/// Iterate through disclosed globs and make a source for each file
|
||||
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys, s3_configuration.request_settings);
|
||||
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys);
|
||||
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
|
||||
}
|
||||
else
|
||||
@ -909,7 +905,7 @@ Pipe StorageS3::read(
|
||||
format_settings,
|
||||
columns_description,
|
||||
max_block_size,
|
||||
s3_configuration.request_settings,
|
||||
s3_configuration.rw_settings.max_single_read_retries,
|
||||
compression_method,
|
||||
s3_configuration.client,
|
||||
s3_configuration.uri.bucket,
|
||||
@ -1026,10 +1022,12 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
||||
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
|
||||
{
|
||||
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
|
||||
if (upd.request_settings != settings.request_settings)
|
||||
upd.request_settings = settings.request_settings;
|
||||
const auto & config_rw_settings = settings.rw_settings;
|
||||
|
||||
upd.request_settings.updateFromSettingsIfEmpty(ctx->getSettings());
|
||||
if (upd.rw_settings != config_rw_settings)
|
||||
upd.rw_settings = settings.rw_settings;
|
||||
|
||||
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
|
||||
|
||||
if (upd.client)
|
||||
{
|
||||
@ -1047,12 +1045,10 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
|
||||
ctx->getRemoteHostFilter(),
|
||||
static_cast<unsigned>(ctx->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
upd.request_settings.get_request_throttler,
|
||||
upd.request_settings.put_request_throttler);
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = upd.uri.endpoint;
|
||||
client_configuration.maxConnections = static_cast<unsigned>(upd.request_settings.max_connections);
|
||||
client_configuration.maxConnections = static_cast<unsigned>(upd.rw_settings.max_connections);
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
|
||||
auto headers = upd.auth_settings.headers;
|
||||
@ -1084,17 +1080,17 @@ void StorageS3::processNamedCollectionResult(StorageS3Configuration & configurat
|
||||
else if (arg_name == "use_environment_credentials")
|
||||
configuration.auth_settings.use_environment_credentials = checkAndGetLiteralArgument<UInt8>(arg_value, "use_environment_credentials");
|
||||
else if (arg_name == "max_single_read_retries")
|
||||
configuration.request_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_read_retries");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_read_retries");
|
||||
else if (arg_name == "min_upload_part_size")
|
||||
configuration.request_settings.min_upload_part_size = checkAndGetLiteralArgument<UInt64>(arg_value, "min_upload_part_size");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "min_upload_part_size");
|
||||
else if (arg_name == "upload_part_size_multiply_factor")
|
||||
configuration.request_settings.upload_part_size_multiply_factor = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_factor");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_factor");
|
||||
else if (arg_name == "upload_part_size_multiply_parts_count_threshold")
|
||||
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_parts_count_threshold");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_parts_count_threshold");
|
||||
else if (arg_name == "max_single_part_upload_size")
|
||||
configuration.request_settings.max_single_part_upload_size = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_part_upload_size");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_part_upload_size");
|
||||
else if (arg_name == "max_connections")
|
||||
configuration.request_settings.max_connections = checkAndGetLiteralArgument<UInt64>(arg_value, "max_connections");
|
||||
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_connections");
|
||||
else
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
|
||||
@ -1170,7 +1166,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
|
||||
S3Configuration s3_configuration{
|
||||
configuration.url,
|
||||
configuration.auth_settings,
|
||||
S3Settings::RequestSettings(ctx->getSettingsRef()),
|
||||
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
|
||||
configuration.headers};
|
||||
|
||||
updateS3Configuration(ctx, s3_configuration);
|
||||
@ -1232,7 +1228,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadBufferFromS3>(
|
||||
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.request_settings, ctx->getReadSettings()),
|
||||
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
|
||||
chooseCompressionMethod(key, compression_method),
|
||||
zstd_window_log_max);
|
||||
};
|
||||
|
@ -43,8 +43,7 @@ public:
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
|
||||
Strings * read_keys_ = nullptr,
|
||||
const S3Settings::RequestSettings & request_settings_ = {});
|
||||
Strings * read_keys_ = nullptr);
|
||||
|
||||
String next();
|
||||
|
||||
@ -80,7 +79,7 @@ public:
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
UInt64 max_block_size_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
UInt64 max_single_read_retries_,
|
||||
String compression_hint_,
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & client_,
|
||||
const String & bucket,
|
||||
@ -103,7 +102,7 @@ private:
|
||||
String format;
|
||||
ColumnsDescription columns_desc;
|
||||
UInt64 max_block_size;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
UInt64 max_single_read_retries;
|
||||
String compression_hint;
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
Block sample_block;
|
||||
@ -187,7 +186,7 @@ public:
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
|
||||
S3::AuthSettings auth_settings;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
|
||||
/// If s3 configuration was passed from ast, then it is static.
|
||||
/// If from config - it can be changed with config reload.
|
||||
@ -199,11 +198,11 @@ public:
|
||||
S3Configuration(
|
||||
const String & url_,
|
||||
const S3::AuthSettings & auth_settings_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const HeaderCollection & headers_from_ast_)
|
||||
: uri(S3::URI(url_))
|
||||
, auth_settings(auth_settings_)
|
||||
, request_settings(request_settings_)
|
||||
, rw_settings(rw_settings_)
|
||||
, static_configuration(!auth_settings_.access_key_id.empty())
|
||||
, headers_from_ast(headers_from_ast_) {}
|
||||
};
|
||||
|
@ -46,7 +46,7 @@ StorageS3Cluster::StorageS3Cluster(
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, filename(configuration_.url)
|
||||
, cluster_name(configuration_.cluster_name)
|
||||
, format_name(configuration_.format)
|
||||
|
@ -4,7 +4,6 @@
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <base/unit.h>
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@ -58,26 +57,18 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
|
||||
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config_elem + "." + key, config);
|
||||
|
||||
S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
|
||||
request_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
|
||||
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
|
||||
request_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
|
||||
request_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
request_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
|
||||
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
|
||||
request_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
|
||||
request_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
rw_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
|
||||
rw_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
|
||||
rw_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
|
||||
rw_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
|
||||
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
|
||||
rw_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
|
||||
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
|
||||
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
|
||||
|
||||
// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
|
||||
if (UInt64 max_get_rps = get_uint_for_key(key, "max_get_rps", true, settings.s3_max_get_rps))
|
||||
request_settings.get_request_throttler = std::make_shared<Throttler>(
|
||||
max_get_rps, get_uint_for_key(key, "max_get_burst", true, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * max_get_rps));
|
||||
if (UInt64 max_put_rps = get_uint_for_key(key, "max_put_rps", true, settings.s3_max_put_rps))
|
||||
request_settings.put_request_throttler = std::make_shared<Throttler>(
|
||||
max_put_rps, get_uint_for_key(key, "max_put_burst", true, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * max_put_rps));
|
||||
|
||||
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(request_settings)});
|
||||
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -98,7 +89,7 @@ S3Settings StorageS3Settings::getSettings(const String & endpoint) const
|
||||
return {};
|
||||
}
|
||||
|
||||
S3Settings::RequestSettings::RequestSettings(const Settings & settings)
|
||||
S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
|
||||
{
|
||||
max_single_read_retries = settings.s3_max_single_read_retries;
|
||||
min_upload_part_size = settings.s3_min_upload_part_size;
|
||||
@ -108,15 +99,9 @@ S3Settings::RequestSettings::RequestSettings(const Settings & settings)
|
||||
max_connections = settings.s3_max_connections;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
|
||||
if (settings.s3_max_get_rps)
|
||||
get_request_throttler = std::make_shared<Throttler>(
|
||||
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);
|
||||
if (settings.s3_max_put_rps)
|
||||
put_request_throttler = std::make_shared<Throttler>(
|
||||
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
|
||||
}
|
||||
|
||||
void S3Settings::RequestSettings::updateFromSettingsIfEmpty(const Settings & settings)
|
||||
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
|
||||
{
|
||||
if (!max_single_read_retries)
|
||||
max_single_read_retries = settings.s3_max_single_read_retries;
|
||||
@ -137,12 +122,6 @@ void S3Settings::RequestSettings::updateFromSettingsIfEmpty(const Settings & set
|
||||
if (!max_unexpected_write_error_retries)
|
||||
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
if (!get_request_throttler && settings.s3_max_get_rps)
|
||||
get_request_throttler = std::make_shared<Throttler>(
|
||||
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);
|
||||
if (!put_request_throttler && settings.s3_max_put_rps)
|
||||
put_request_throttler = std::make_shared<Throttler>(
|
||||
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <vector>
|
||||
#include <base/types.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <Storages/HeaderCollection.h>
|
||||
|
||||
#include <IO/S3Common.h>
|
||||
@ -24,7 +23,7 @@ struct Settings;
|
||||
|
||||
struct S3Settings
|
||||
{
|
||||
struct RequestSettings
|
||||
struct ReadWriteSettings
|
||||
{
|
||||
size_t max_single_read_retries = 0;
|
||||
size_t min_upload_part_size = 0;
|
||||
@ -36,13 +35,11 @@ struct S3Settings
|
||||
size_t max_connections = 0;
|
||||
bool check_objects_after_upload = false;
|
||||
size_t max_unexpected_write_error_retries = 0;
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
|
||||
RequestSettings() = default;
|
||||
explicit RequestSettings(const Settings & settings);
|
||||
ReadWriteSettings() = default;
|
||||
explicit ReadWriteSettings(const Settings & settings);
|
||||
|
||||
inline bool operator==(const RequestSettings & other) const
|
||||
inline bool operator==(const ReadWriteSettings & other) const
|
||||
{
|
||||
return max_single_read_retries == other.max_single_read_retries
|
||||
&& min_upload_part_size == other.min_upload_part_size
|
||||
@ -53,20 +50,18 @@ struct S3Settings
|
||||
&& max_single_operation_copy_size == other.max_single_operation_copy_size
|
||||
&& max_connections == other.max_connections
|
||||
&& check_objects_after_upload == other.check_objects_after_upload
|
||||
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries
|
||||
&& get_request_throttler == other.get_request_throttler
|
||||
&& put_request_throttler == other.put_request_throttler;
|
||||
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries;
|
||||
}
|
||||
|
||||
void updateFromSettingsIfEmpty(const Settings & settings);
|
||||
};
|
||||
|
||||
S3::AuthSettings auth_settings;
|
||||
RequestSettings request_settings;
|
||||
ReadWriteSettings rw_settings;
|
||||
|
||||
inline bool operator==(const S3Settings & other) const
|
||||
{
|
||||
return auth_settings == other.auth_settings && request_settings == other.request_settings;
|
||||
return auth_settings == other.auth_settings && rw_settings == other.rw_settings;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1,2 +0,0 @@
|
||||
1
|
||||
1 1 1
|
@ -1,26 +0,0 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag no-fasttest: needs s3
|
||||
|
||||
-- Limit S3 PUT request per second rate
|
||||
SET s3_max_put_rps = 2;
|
||||
SET s3_max_put_burst = 1;
|
||||
|
||||
CREATE TEMPORARY TABLE times (t DateTime);
|
||||
|
||||
-- INSERT query requires 3 PUT requests and 1/rps = 0.5 second in between, the first query is not throttled due to burst
|
||||
INSERT INTO times SELECT now();
|
||||
INSERT INTO TABLE FUNCTION s3('http://localhost:11111/test/request-throttler.csv', 'test', 'testtest', 'CSV', 'number UInt64') SELECT number FROM numbers(1000000) SETTINGS s3_max_single_part_upload_size = 10000, s3_truncate_on_insert = 1;
|
||||
INSERT INTO times SELECT now();
|
||||
|
||||
SELECT max(t) - min(t) >= 1 FROM times;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
SELECT ProfileEvents['S3CreateMultipartUpload'] == 1,
|
||||
ProfileEvents['S3UploadPart'] == 1,
|
||||
ProfileEvents['S3CompleteMultipartUpload'] == 1
|
||||
FROM system.query_log
|
||||
WHERE query LIKE '%request-throttler.csv%'
|
||||
AND type = 'QueryFinish'
|
||||
AND current_database = currentDatabase()
|
||||
ORDER BY query_start_time DESC
|
||||
LIMIT 1;
|
@ -0,0 +1,8 @@
|
||||
0
|
||||
0
|
||||
|
||||
0
|
||||
((0.0001)) 0
|
||||
((0.0001)) 0
|
||||
|
||||
((0.0001)) 0
|
@ -0,0 +1,5 @@
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
SELECT anyLast(number) FROM numbers(1) GROUP BY number WITH ROLLUP WITH TOTALS;
|
||||
|
||||
SELECT tuple(tuple(0.0001)), anyLast(number) FROM numbers(1) GROUP BY number WITH ROLLUP WITH TOTALS;
|
@ -0,0 +1,3 @@
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
SELECT min(b), x AS b FROM (SELECT max(number) FROM numbers(1)); -- { serverError UNKNOWN_IDENTIFIER }
|
Loading…
Reference in New Issue
Block a user