Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-06-27 20:31:35 +00:00
commit 23d7e0148f
127 changed files with 1678 additions and 589 deletions

View File

@ -159,33 +159,18 @@ jobs:
############################################################################################
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
Builds_Report:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderDebAarch64
- BuilderDebAsan
- BuilderDebDebug
- BuilderDebRelease
- BuilderDebTsan
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}
BuilderSpecialReport:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinDarwin
- BuilderBinDarwinAarch64
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse special build check
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'Builds') }}
needs: [RunConfig, BuilderDebAarch64, BuilderDebAsan, BuilderDebDebug, BuilderDebRelease, BuilderDebTsan, BuilderBinDarwin, BuilderBinDarwinAarch64]
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
- name: Builds report
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 ./build_report_check.py --reports package_release package_aarch64 package_asan package_tsan package_debug binary_darwin binary_darwin_aarch64
############################################################################################
#################################### INSTALL PACKAGES ######################################
############################################################################################
@ -256,8 +241,7 @@ jobs:
FinishCheck:
if: ${{ !failure() && !cancelled() }}
needs:
- BuilderReport
- BuilderSpecialReport
- Builds_Report
- FunctionalStatelessTestAsan
- FunctionalStatefulTestDebug
- StressTestTsan
@ -273,5 +257,8 @@ jobs:
- name: Finish label
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
# update mergeable check
python3 merge_pr.py --set-ci-status --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
# update overall ci report
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
python3 merge_pr.py

29
.github/workflows/create_release.yml vendored Normal file
View File

@ -0,0 +1,29 @@
name: CreateRelease
concurrency:
group: release
'on':
workflow_dispatch:
inputs:
sha:
description: 'The SHA hash of the commit from which to create the release'
required: true
type: string
type:
description: 'The type of release: "new" for a new release or "patch" for a patch release'
required: true
type: choice
options:
- new
- patch
jobs:
Release:
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
- name: Print greeting
run: |
python3 ./tests/ci/release.py --commit ${{ inputs.sha }} --type ${{ inputs.type }} --dry-run

View File

@ -117,11 +117,11 @@ jobs:
# Reports should run even if Builds_1/2 fail - run them separately, not in Tests_1/2/3
Builds_Report:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'ClickHouse build check') }}
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'Builds') }}
needs: [RunConfig, Builds_1, Builds_2]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
test_name: Builds
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}

View File

@ -96,20 +96,15 @@ jobs:
stage: Tests_1
data: ${{ needs.RunConfig.outputs.data }}
################################# Stage Final #################################
#
FinishCheck:
if: ${{ !cancelled() }}
CheckReadyForMerge:
if: ${{ !cancelled() && needs.StyleCheck.result == 'success' }}
# Test_2 or Test_3 must not have jobs required for Mergeable check
needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Tests_1]
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
- name: Check sync status
- name: Check and set merge status
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 sync_pr.py --status
- name: Finish label
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
python3 merge_pr.py --set-ci-status --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}

View File

@ -146,11 +146,11 @@ jobs:
# Reports should run even if Builds_1/2 fail - run them separately (not in Tests_1/2/3)
Builds_Report:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'ClickHouse build check') }}
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'Builds') }}
needs: [RunConfig, StyleCheck, Builds_1, Builds_2]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
test_name: Builds
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}

View File

@ -176,35 +176,18 @@ jobs:
############################################################################################
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
Builds_Report:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderDebRelease
- BuilderDebAarch64
- BuilderDebAsan
- BuilderDebTsan
- BuilderDebUBsan
- BuilderDebMsan
- BuilderDebDebug
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}
BuilderSpecialReport:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinDarwin
- BuilderBinDarwinAarch64
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse special build check
runner_type: style-checker-aarch64
data: ${{ needs.RunConfig.outputs.data }}
if: ${{ !cancelled() && needs.RunConfig.result == 'success' && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'Builds') }}
needs: [RunConfig, BuilderDebRelease, BuilderDebAarch64, BuilderDebAsan, BuilderDebUBsan, BuilderDebMsan, BuilderDebTsan, BuilderDebDebug, BuilderBinDarwin, BuilderBinDarwinAarch64]
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
- name: Builds report
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 ./build_report_check.py --reports package_release package_aarch64 package_asan package_msan package_ubsan package_tsan package_debug binary_darwin binary_darwin_aarch64
MarkReleaseReady:
if: ${{ !failure() && !cancelled() }}
needs:
@ -460,8 +443,7 @@ jobs:
needs:
- DockerServerImage
- DockerKeeperImage
- BuilderReport
- BuilderSpecialReport
- Builds_Report
- MarkReleaseReady
- FunctionalStatelessTestDebug
- FunctionalStatelessTestRelease
@ -496,4 +478,7 @@ jobs:
- name: Finish label
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
# update mergeable check
python3 merge_pr.py --set-ci-status --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
# update overall ci report
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}

View File

@ -29,17 +29,17 @@
* Added `merge_workload` and `mutation_workload` settings to regulate how resources are utilized and shared between merges, mutations and other workloads. [#64061](https://github.com/ClickHouse/ClickHouse/pull/64061) ([Sergei Trifonov](https://github.com/serxa)).
* Add support for comparing IPv4 and IPv6 types using the `=` operator. [#64292](https://github.com/ClickHouse/ClickHouse/pull/64292) ([Francisco J. Jurado Moreno](https://github.com/Beetelbrox)).
* Allow to store named collections in zookeeper. [#64574](https://github.com/ClickHouse/ClickHouse/pull/64574) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Support decimal arguments in binary math functions (pow(), atan2(), max2, min2(), hypot(). [#64582](https://github.com/ClickHouse/ClickHouse/pull/64582) ([Mikhail Gorshkov](https://github.com/mgorshkov)).
* Support decimal arguments in binary math functions (pow, atan2, max2, min2, hypot). [#64582](https://github.com/ClickHouse/ClickHouse/pull/64582) ([Mikhail Gorshkov](https://github.com/mgorshkov)).
* Add support for index analysis over `hilbertEncode`. [#64662](https://github.com/ClickHouse/ClickHouse/pull/64662) ([Artem Mustafin](https://github.com/Artemmm91)).
* Added SQL functions `parseReadableSize` (along with `OrNull` and `OrZero` variants). [#64742](https://github.com/ClickHouse/ClickHouse/pull/64742) ([Francisco J. Jurado Moreno](https://github.com/Beetelbrox)).
* Add server settings `max_table_num_to_throw` and `max_database_num_to_throw` to limit the number of databases or tables on `CREATE` queries. [#64781](https://github.com/ClickHouse/ClickHouse/pull/64781) ([Xu Jia](https://github.com/XuJia0210)).
* Add _time virtual column to file alike storages (s3/file/hdfs/url/azureBlobStorage). [#64947](https://github.com/ClickHouse/ClickHouse/pull/64947) ([Ilya Golshtein](https://github.com/ilejn)).
* Introduced new functions `base64UrlEncode`, `base64UrlDecode` and `tryBase64UrlDecode`. [#64991](https://github.com/ClickHouse/ClickHouse/pull/64991) ([Mikhail Gorshkov](https://github.com/mgorshkov)).
* Introduced new functions `base64URLEncode`, `base64URLDecode` and `tryBase64URLDecode`. [#64991](https://github.com/ClickHouse/ClickHouse/pull/64991) ([Mikhail Gorshkov](https://github.com/mgorshkov)).
* Add new function `editDistanceUTF8`, which calculates the [edit distance](https://en.wikipedia.org/wiki/Edit_distance) between two UTF8 strings. [#65269](https://github.com/ClickHouse/ClickHouse/pull/65269) ([LiuNeng](https://github.com/liuneng1994)).
#### Performance Improvement
* Add a native parquet reader, which can read parquet binary to ClickHouse Columns directly. It's controlled by the setting `input_format_parquet_use_native_reader` (disabled by default). [#60361](https://github.com/ClickHouse/ClickHouse/pull/60361) ([ZhiHong Zhang](https://github.com/copperybean)).
* Reduce the number of virtual function calls in ColumnNullable::size(). [#60556](https://github.com/ClickHouse/ClickHouse/pull/60556) ([HappenLee](https://github.com/HappenLee)).
* Reduce the number of virtual function calls in ColumnNullable::size. [#60556](https://github.com/ClickHouse/ClickHouse/pull/60556) ([HappenLee](https://github.com/HappenLee)).
* Speedup `splitByRegexp` when the regular expression argument is a single-character. [#62696](https://github.com/ClickHouse/ClickHouse/pull/62696) ([Robert Schulze](https://github.com/rschu1ze)).
* Speed up FixedHashTable by keeping track of the min and max keys used. This allows to reduce the number of cells that need to be verified. [#62746](https://github.com/ClickHouse/ClickHouse/pull/62746) ([Jiebin Sun](https://github.com/jiebinn)).
* Optimize the resolution of in(LowCardinality, ConstantSet). [#64060](https://github.com/ClickHouse/ClickHouse/pull/64060) ([Zhiguo Zhou](https://github.com/ZhiguoZh)).
@ -51,7 +51,7 @@
* Improve function least/greatest for nullable numberic type arguments. [#64668](https://github.com/ClickHouse/ClickHouse/pull/64668) ([KevinyhZou](https://github.com/KevinyhZou)).
* Allow merging two consequent `FilterSteps` of a query plan. This improves filter-push-down optimization if the filter condition can be pushed down from the parent step. [#64760](https://github.com/ClickHouse/ClickHouse/pull/64760) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Remove bad optimization in vertical final implementation and re-enable vertical final algorithm by default. [#64783](https://github.com/ClickHouse/ClickHouse/pull/64783) ([Duc Canh Le](https://github.com/canhld94)).
* Remove ALIAS nodes from the filter expression. This slightly improves performance for queries with `PREWHERE` (with new analyzer). [#64793](https://github.com/ClickHouse/ClickHouse/pull/64793) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Remove ALIAS nodes from the filter expression. This slightly improves performance for queries with `PREWHERE` (with the new analyzer). [#64793](https://github.com/ClickHouse/ClickHouse/pull/64793) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix performance regression in cross join introduced in [#60459](https://github.com/ClickHouse/ClickHouse/issues/60459) (24.5). [#65243](https://github.com/ClickHouse/ClickHouse/pull/65243) ([Nikita Taranov](https://github.com/nickitat)).
#### Improvement
@ -63,7 +63,7 @@
* Reduce the memory usage when using Azure object storage by using fixed memory allocation, avoiding the allocation of an extra buffer. [#63160](https://github.com/ClickHouse/ClickHouse/pull/63160) ([SmitaRKulkarni](https://github.com/SmitaRKulkarni)).
* Several minor corner case fixes to proxy support & tunneling. [#63427](https://github.com/ClickHouse/ClickHouse/pull/63427) ([Arthur Passos](https://github.com/arthurpassos)).
* Add `http_response_headers` setting to support custom response headers in custom HTTP handlers. [#63562](https://github.com/ClickHouse/ClickHouse/pull/63562) ([Grigorii](https://github.com/GSokol)).
* Improve io_uring resubmits visibility. Rename profile event `IOUringSQEsResubmits` -> `IOUringSQEsResubmitsAsync` and add a new one `IOUringSQEsResubmitsSync`. [#63699](https://github.com/ClickHouse/ClickHouse/pull/63699) ([Tomer Shafir](https://github.com/tomershafir)).
* Improve io_uring resubmit visibility. Rename profile event `IOUringSQEsResubmits` -> `IOUringSQEsResubmitsAsync` and add a new one `IOUringSQEsResubmitsSync`. [#63699](https://github.com/ClickHouse/ClickHouse/pull/63699) ([Tomer Shafir](https://github.com/tomershafir)).
* Introduce assertions to verify all functions are called with columns of the right size. [#63723](https://github.com/ClickHouse/ClickHouse/pull/63723) ([Raúl Marín](https://github.com/Algunenano)).
* `SHOW CREATE TABLE` executed on top of system tables will now show the super handy comment unique for each table which will explain why this table is needed. [#63788](https://github.com/ClickHouse/ClickHouse/pull/63788) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Added setting `metadata_storage_type` to keep free space on metadata storage disk. [#64128](https://github.com/ClickHouse/ClickHouse/pull/64128) ([MikhailBurdukov](https://github.com/MikhailBurdukov)).

View File

@ -267,7 +267,7 @@ A pull request can be created even if the work is not completed yet. In this cas
Testing will commence as soon as ClickHouse employees label your PR with a tag “can be tested”. The results of some first checks (e.g. code style) will come in within several minutes. Build check results will arrive within half an hour. And the main set of tests will report itself within an hour.
The system will prepare ClickHouse binary builds for your pull request individually. To retrieve these builds click the “Details” link next to “ClickHouse build check” entry in the list of checks. There you will find direct links to the built .deb packages of ClickHouse which you can deploy even on your production servers (if you have no fear).
The system will prepare ClickHouse binary builds for your pull request individually. To retrieve these builds click the “Details” link next to “Builds” entry in the list of checks. There you will find direct links to the built .deb packages of ClickHouse which you can deploy even on your production servers (if you have no fear).
Most probably some of the builds will fail at first times. This is due to the fact that we check builds both with gcc as well as with clang, with almost all of existing warnings (always with the `-Werror` flag) enabled for clang. On that same page, you can find all of the build logs so that you do not have to build ClickHouse in all of the possible ways.

View File

@ -28,7 +28,7 @@ run, for example, the test `01428_hash_set_nan_key`, change to the repository
folder and run the following command:
```
PATH=$PATH:<path to clickhouse-client> tests/clickhouse-test 01428_hash_set_nan_key
PATH=<path to clickhouse-client>:$PATH tests/clickhouse-test 01428_hash_set_nan_key
```
Test results (`stderr` and `stdout`) are written to files `01428_hash_set_nan_key.[stderr|stdout]` which

View File

@ -314,7 +314,7 @@ For example, to download a aarch64 binary for ClickHouse v23.4, follow these ste
- Find the GitHub pull request for release v23.4: [Release pull request for branch 23.4](https://github.com/ClickHouse/ClickHouse/pull/49238)
- Click "Commits", then click a commit similar to "Update autogenerated version to 23.4.2.1 and contributors" for the particular version you like to install.
- Click the green check / yellow dot / red cross to open the list of CI checks.
- Click "Details" next to "ClickHouse Build Check" in the list, it will open a page similar to [this page](https://s3.amazonaws.com/clickhouse-test-reports/46793/b460eb70bf29b19eadd19a1f959b15d186705394/clickhouse_build_check/report.html)
- Click "Details" next to "Builds" in the list, it will open a page similar to [this page](https://s3.amazonaws.com/clickhouse-test-reports/46793/b460eb70bf29b19eadd19a1f959b15d186705394/clickhouse_build_check/report.html)
- Find the rows with compiler = "clang-*-aarch64" - there are multiple rows.
- Download the artifacts for these builds.

View File

@ -2169,6 +2169,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `lz4`.
- [input_format_parquet_max_block_size](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Max block row size for parquet reader. Default value - `65409`.
- [input_format_parquet_prefer_block_bytes](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_prefer_block_bytes) - Average block bytes output by parquet reader. Default value - `16744704`.
- [output_format_parquet_write_page_index](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Add a possibility to write page index into parquet files. Need to disable `output_format_parquet_use_custom_encoder` at present. Default value - `true`.
## ParquetMetadata {data-format-parquet-metadata}

View File

@ -1463,6 +1463,9 @@ Keys:
- `size` Size of the file. Applies to `log` and `errorlog`. Once the file reaches `size`, ClickHouse archives and renames it, and creates a new log file in its place.
- `count` The number of archived log files that ClickHouse stores.
- `console` Send `log` and `errorlog` to the console instead of file. To enable, set to `1` or `true`.
- `console_log_level` Logging level for console. Default to `level`.
- `use_syslog` - Log to syslog as well.
- `syslog_level` - Logging level for logging to syslog.
- `stream_compress` Compress `log` and `errorlog` with `lz4` stream compression. To enable, set to `1` or `true`.
- `formatting` Specify log format to be printed in console log (currently only `json` supported).

View File

@ -1428,6 +1428,13 @@ Average block bytes output by parquet reader. Lowering the configuration in the
Default value: `65409 * 256 = 16744704`
### output_format_parquet_write_page_index {#input_format_parquet_max_block_size}
Could add page index into parquet files. To enable this, need set `output_format_parquet_use_custom_encoder`=`false` and
`output_format_parquet_write_page_index`=`true`.
Enable by default.
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}

View File

@ -1168,14 +1168,14 @@ Result:
└────────────────────────────┘
```
## base64UrlEncode
## base64URLEncode
Encodes an URL (String or FixedString) as base64 with URL-specific modifications, according to [RFC 4648](https://datatracker.ietf.org/doc/html/rfc4648#section-5).
**Syntax**
```sql
base64UrlEncode(url)
base64URLEncode(url)
```
**Arguments**
@ -1189,13 +1189,13 @@ base64UrlEncode(url)
**Example**
``` sql
SELECT base64UrlEncode('https://clickhouse.com');
SELECT base64URLEncode('https://clickhouse.com');
```
Result:
```result
┌─base64UrlEncode('https://clickhouse.com')─┐
┌─base64URLEncode('https://clickhouse.com')─┐
│ aHR0cDovL2NsaWNraG91c2UuY29t │
└───────────────────────────────────────────┘
```
@ -1234,19 +1234,19 @@ Result:
└──────────────────────────────────┘
```
## base64UrlDecode
## base64URLDecode
Accepts a base64-encoded URL and decodes it from base64 with URL-specific modifications, according to [RFC 4648](https://datatracker.ietf.org/doc/html/rfc4648#section-5). Throws an exception in case of an error.
**Syntax**
```sql
base64UrlDecode(encodedUrl)
base64URLDecode(encodedUrl)
```
**Arguments**
- `encodedUrl` — [String](../data-types/string.md) column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, an exception is thrown.
- `encodedURL` — [String](../data-types/string.md) column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, an exception is thrown.
**Returned value**
@ -1255,13 +1255,13 @@ base64UrlDecode(encodedUrl)
**Example**
``` sql
SELECT base64UrlDecode('aHR0cDovL2NsaWNraG91c2UuY29t');
SELECT base64URLDecode('aHR0cDovL2NsaWNraG91c2UuY29t');
```
Result:
```result
┌─base64UrlDecode('aHR0cDovL2NsaWNraG91c2UuY29t')─┐
┌─base64URLDecode('aHR0cDovL2NsaWNraG91c2UuY29t')─┐
│ https://clickhouse.com │
└─────────────────────────────────────────────────┘
```
@ -1298,19 +1298,19 @@ SELECT tryBase64Decode('RW5jb2RlZA==') as res, tryBase64Decode('invalid') as res
└────────────┴─────────────┘
```
## tryBase64UrlDecode
## tryBase64URLDecode
Like `base64UrlDecode` but returns an empty string in case of error.
Like `base64URLDecode` but returns an empty string in case of error.
**Syntax**
```sql
tryBase64UrlDecode(encodedUrl)
tryBase64URLDecode(encodedUrl)
```
**Parameters**
- `encodedUrl`: [String](../data-types/string.md) column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, returns an empty string.
- `encodedURL`: [String](../data-types/string.md) column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, returns an empty string.
**Returned value**
@ -1321,7 +1321,7 @@ tryBase64UrlDecode(encodedUrl)
Query:
```sql
SELECT tryBase64UrlDecode('aHR0cDovL2NsaWNraG91c2UuY29t') as res, tryBase64Decode('aHR0cHM6Ly9jbGlja') as res_invalid;
SELECT tryBase64URLDecode('aHR0cDovL2NsaWNraG91c2UuY29t') as res, tryBase64Decode('aHR0cHM6Ly9jbGlja') as res_invalid;
```
```response

View File

@ -818,6 +818,40 @@ The same as above, but including query string and fragment.
Example: `/top/news.html?page=2#comments`.
### protocol
Extracts the protocol from a URL.
**Syntax**
```sql
protocol(url)
```
**Arguments**
- `url` — URL to extract protocol from. [String](../data-types/string.md).
**Returned value**
- Protocol, or an empty string if it cannot be determined. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT protocol('https://clickhouse.com/');
```
Result:
```response
┌─protocol('https://clickhouse.com/')─┐
│ https │
└─────────────────────────────────────┘
```
### queryString
Returns the query string without the initial question mark, `#` and everything after `#`.

View File

@ -283,7 +283,7 @@ Pull request можно создать, даже если работа над з
Тесты будут запущены, как только сотрудники ClickHouse поставят для pull request тег «Can be tested». Результаты первых проверок (стиль кода) появятся уже через несколько минут. Результаты сборки появятся примерно через пол часа. Результаты основного набора тестов будут доступны в пределах часа.
Система подготовит сборки ClickHouse специально для вашего pull request. Для их получения, нажмите на ссылку «Details» у проверки «Clickhouse build check». Там вы сможете найти прямые ссылки на собранные .deb пакеты ClickHouse, которые, при желании, вы даже сможете установить на свои продакшен серверы (если не страшно).
Система подготовит сборки ClickHouse специально для вашего pull request. Для их получения, нажмите на ссылку «Details» у проверки «Builds». Там вы сможете найти прямые ссылки на собранные .deb пакеты ClickHouse, которые, при желании, вы даже сможете установить на свои продакшен серверы (если не страшно).
Вероятнее всего, часть сборок не будет успешной с первого раза. Ведь мы проверяем сборку кода и gcc и clang, а при сборке с помощью clang включаются почти все существующие в природе warnings (всегда с флагом `-Werror`). На той же странице, вы сможете найти логи сборки - вам не обязательно самому собирать ClickHouse всеми возможными способами.

View File

@ -538,7 +538,7 @@ SELECT base58Decode('3dc8KtHrwM');
Синоним: `TO_BASE64`.
## base64UrlEncode(s)
## base64URLEncode(s)
Производит кодирование URL (String или FixedString) в base64-представление в соответствии с [RFC 4648](https://tools.ietf.org/html/rfc4648).
@ -548,7 +548,7 @@ SELECT base58Decode('3dc8KtHrwM');
Синоним: `FROM_BASE64`.
## base64UrlDecode(s)
## base64URLDecode(s)
Декодирует base64-представление URL в исходную строку в соответствии с [RFC 4648](https://tools.ietf.org/html/rfc4648). При невозможности декодирования выбрасывает исключение
@ -556,9 +556,9 @@ SELECT base58Decode('3dc8KtHrwM');
Функционал аналогичен base64Decode, но при невозможности декодирования возвращает пустую строку.
## tryBase64UrlDecode(s)
## tryBase64URLDecode(s)
Функционал аналогичен base64UrlDecode, но при невозможности декодирования возвращает пустую строку.
Функционал аналогичен base64URLDecode, но при невозможности декодирования возвращает пустую строку.
## endsWith(s, suffix) {#endswith}

View File

@ -29,7 +29,14 @@
-->
<size>1000M</size>
<count>10</count>
<!-- <console>1</console> --> <!-- Default behavior is autodetection (log to console if not daemon mode and is tty) -->
<!-- <console_log_level>trace</console_log_level> -->
<!-- <use_syslog>0</use_syslog> -->
<!-- <syslog_level>trace</syslog_level> -->
<!-- <stream_compress>0</stream_compress> -->
<!-- Per level overrides (legacy):
@ -408,13 +415,11 @@
<!-- Approximate size of mark cache, used in tables of MergeTree family.
In bytes. Cache is single for server. Memory is allocated only on demand.
You should not lower this value.
-->
<mark_cache_size>5368709120</mark_cache_size>
You should not lower this value. -->
<!-- <mark_cache_size>5368709120</mark_cache_size> -->
<!-- For marks of secondary indices.
-->
<index_mark_cache_size>5368709120</index_mark_cache_size>
<!-- For marks of secondary indices. -->
<!-- <index_mark_cache_size>5368709120</index_mark_cache_size> -->
<!-- If you enable the `min_bytes_to_use_mmap_io` setting,
the data in MergeTree tables can be read with mmap to avoid copying from kernel to userspace.
@ -432,13 +437,23 @@
The cache is dropped (the files are closed) automatically on removal of old parts in MergeTree,
also it can be dropped manually by the SYSTEM DROP MMAP CACHE query.
-->
<mmap_cache_size>1000</mmap_cache_size>
<!-- <mmap_cache_size>1024</mmap_cache_size> -->
<!-- Cache size in bytes for compiled expressions.-->
<compiled_expression_cache_size>134217728</compiled_expression_cache_size>
<!-- <compiled_expression_cache_size>134217728</compiled_expression_cache_size> -->
<!-- Cache size in elements for compiled expressions.-->
<compiled_expression_cache_elements_size>10000</compiled_expression_cache_elements_size>
<!-- <compiled_expression_cache_elements_size>10000</compiled_expression_cache_elements_size> -->
<!-- Configuration for the query cache -->
<!--
<query_cache>
<max_size_in_bytes>1073741824</max_size_in_bytes>
<max_entries>1024</max_entries>
<max_entry_size_in_bytes>1048576</max_entry_size_in_bytes>
<max_entry_size_in_rows>30000000</max_entry_size_in_rows>
</query_cache>
-->
<!-- Cache path for custom (created from SQL) cached disks -->
<custom_cached_disks_base_directory>/var/lib/clickhouse/caches/</custom_cached_disks_base_directory>
@ -1642,14 +1657,6 @@
-->
<!-- </kafka> -->
<!-- Configuration for the query cache -->
<query_cache>
<max_size_in_bytes>1073741824</max_size_in_bytes>
<max_entries>1024</max_entries>
<max_entry_size_in_bytes>1048576</max_entry_size_in_bytes>
<max_entry_size_in_rows>30000000</max_entry_size_in_rows>
</query_cache>
<backups>
<allowed_path>backups</allowed_path>

View File

@ -260,7 +260,10 @@ uncompressed_cache_size: 8589934592
# Approximate size of mark cache, used in tables of MergeTree family.
# In bytes. Cache is single for server. Memory is allocated only on demand.
# You should not lower this value.
mark_cache_size: 5368709120
# mark_cache_size: 5368709120
# For marks of secondary indices.
# index_mark_cache_size: 5368709120
# If you enable the `min_bytes_to_use_mmap_io` setting,
# the data in MergeTree tables can be read with mmap to avoid copying from kernel to userspace.
@ -277,13 +280,20 @@ mark_cache_size: 5368709120
# in query or server memory usage - because this memory can be discarded similar to OS page cache.
# The cache is dropped (the files are closed) automatically on removal of old parts in MergeTree,
# also it can be dropped manually by the SYSTEM DROP MMAP CACHE query.
mmap_cache_size: 1000
# mmap_cache_size: 1024
# Cache size in bytes for compiled expressions.
compiled_expression_cache_size: 134217728
# compiled_expression_cache_size: 134217728
# Cache size in elements for compiled expressions.
compiled_expression_cache_elements_size: 10000
# compiled_expression_cache_elements_size: 10000
# Configuration for the query cache
# query_cache:
# max_size_in_bytes: 1073741824
# max_entries: 1024
# max_entry_size_in_bytes: 1048576
# max_entry_size_in_rows: 30000000
# Path to data directory, with trailing slash.
path: /var/lib/clickhouse/

View File

@ -91,7 +91,8 @@ public:
return std::make_shared<DataTypeNumber<PointType>>();
}
bool allocatesMemoryInArena() const override { return false; }
/// MaxIntersectionsData::Allocator uses the arena
bool allocatesMemoryInArena() const override { return true; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{

View File

@ -1,3 +1,5 @@
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
@ -3495,7 +3497,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
*
* 4. If node has alias, update its value in scope alias map. Deregister alias from expression_aliases_in_resolve_process.
*/
ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, IdentifierResolveScope & scope, bool allow_lambda_expression, bool allow_table_expression, bool ignore_alias)
ProjectionNames QueryAnalyzer::resolveExpressionNode(
QueryTreeNodePtr & node, IdentifierResolveScope & scope, bool allow_lambda_expression, bool allow_table_expression, bool ignore_alias)
{
checkStackSize();
@ -4505,7 +4508,36 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
table_name = table_identifier[1];
}
auto parametrized_view_storage = scope_context->getQueryContext()->buildParametrizedViewStorage(function_ast, database_name, table_name);
/// Collect parametrized view arguments
NameToNameMap view_params;
for (const auto & argument : table_function_node_typed.getArguments())
{
if (auto * arg_func = argument->as<FunctionNode>())
{
if (arg_func->getFunctionName() != "equals")
continue;
auto nodes = arg_func->getArguments().getNodes();
if (nodes.size() != 2)
continue;
if (auto * identifier_node = nodes[0]->as<IdentifierNode>())
{
resolveExpressionNode(nodes[1], scope, /* allow_lambda_expression */false, /* allow_table_function */false);
if (auto * constant = nodes[1]->as<ConstantNode>())
{
view_params[identifier_node->getIdentifier().getFullName()] = convertFieldToString(constant->getValue());
}
}
}
}
auto context = scope_context->getQueryContext();
auto parametrized_view_storage = context->buildParametrizedViewStorage(
database_name,
table_name,
view_params);
if (parametrized_view_storage)
{
auto fake_table_node = std::make_shared<TableNode>(parametrized_view_storage, scope_context);

View File

@ -1,3 +1,4 @@
#include <cstddef>
#include <memory>
#include <Poco/Net/NetException.h>
#include <Core/Defines.h>
@ -37,6 +38,7 @@
#include <Common/FailPoint.h>
#include <Common/config_version.h>
#include <Core/Types.h>
#include "config.h"
#if USE_SSL
@ -68,7 +70,17 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
Connection::~Connection() = default;
Connection::~Connection()
{
try{
if (connected)
Connection::disconnect();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
@ -259,13 +271,31 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
void Connection::disconnect()
{
maybe_compressed_out = nullptr;
in = nullptr;
last_input_packet_type.reset();
std::exception_ptr finalize_exception;
try
{
// finalize() can write and throw an exception.
if (maybe_compressed_out)
maybe_compressed_out->finalize();
}
catch (...)
{
/// Don't throw an exception here, it will leave Connection in invalid state.
finalize_exception = std::current_exception();
if (out)
{
out->cancel();
out = nullptr;
}
}
maybe_compressed_out = nullptr;
try
{
// finalize() can write to socket and throw an exception.
if (out)
out->finalize();
}
@ -278,6 +308,7 @@ void Connection::disconnect()
if (socket)
socket->close();
socket = nullptr;
connected = false;
nonce.reset();
@ -774,6 +805,8 @@ void Connection::sendQuery(
}
maybe_compressed_in.reset();
if (maybe_compressed_out && maybe_compressed_out != out)
maybe_compressed_out->cancel();
maybe_compressed_out.reset();
block_in.reset();
block_logs_in.reset();

View File

@ -57,14 +57,16 @@ void CompressedWriteBuffer::nextImpl()
}
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
finalize();
}
CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
{
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
if (!canceled)
finalize();
}
}

View File

@ -90,13 +90,13 @@ static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_MARK_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.3;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;

View File

@ -1161,6 +1161,7 @@ class IColumn;
M(Bool, output_format_parquet_parallel_encoding, true, "Do Parquet encoding in multiple threads. Requires output_format_parquet_use_custom_encoder.", 0) \
M(UInt64, output_format_parquet_data_page_size, 1024 * 1024, "Target page size in bytes, before compression.", 0) \
M(UInt64, output_format_parquet_batch_size, 1024, "Check page size every this many rows. Consider decreasing if you have columns with average values size above a few KBs.", 0) \
M(Bool, output_format_parquet_write_page_index, true, "Add a possibility to write page index into parquet files.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \

View File

@ -86,6 +86,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},

View File

@ -65,6 +65,7 @@ static constexpr const char * REPLICATED_DATABASE_MARK = "DatabaseReplicated";
static constexpr const char * DROPPED_MARK = "DROPPED";
static constexpr const char * BROKEN_TABLES_SUFFIX = "_broken_tables";
static constexpr const char * BROKEN_REPLICATED_TABLES_SUFFIX = "_broken_replicated_tables";
static constexpr const char * FIRST_REPLICA_DATABASE_NAME = "first_replica_database_name";
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
@ -465,6 +466,13 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
return;
}
/// If not exist, create a node with the database name for introspection.
/// Technically, the database may have different names on different replicas, but this is not a usual case and we only save the first one
auto db_name_path = fs::path(zookeeper_path) / FIRST_REPLICA_DATABASE_NAME;
auto error_code = current_zookeeper->trySet(db_name_path, getDatabaseName());
if (error_code == Coordination::Error::ZNONODE)
current_zookeeper->tryCreate(db_name_path, getDatabaseName(), zkutil::CreateMode::Persistent);
is_readonly = false;
}
catch (...)
@ -1382,6 +1390,13 @@ void DatabaseReplicated::drop(ContextPtr context_)
}
}
void DatabaseReplicated::renameDatabase(ContextPtr query_context, const String & new_name)
{
DatabaseAtomic::renameDatabase(query_context, new_name);
auto db_name_path = fs::path(zookeeper_path) / FIRST_REPLICA_DATABASE_NAME;
getZooKeeper()->set(db_name_path, getDatabaseName());
}
void DatabaseReplicated::stopReplication()
{
if (ddl_worker)

View File

@ -86,6 +86,8 @@ public:
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
void renameDatabase(ContextPtr query_context, const String & new_name) override;
friend struct DatabaseReplicatedTask;
friend class DatabaseReplicatedDDLWorker;
private:

View File

@ -171,6 +171,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding;
format_settings.parquet.data_page_size = settings.output_format_parquet_data_page_size;
format_settings.parquet.write_batch_size = settings.output_format_parquet_batch_size;
format_settings.parquet.write_page_index = settings.output_format_parquet_write_page_index;
format_settings.parquet.local_read_min_bytes_for_seek = settings.input_format_parquet_local_file_min_bytes_for_seek;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;

View File

@ -275,6 +275,7 @@ struct FormatSettings
bool output_compliant_nested_types = true;
size_t data_page_size = 1024 * 1024;
size_t write_batch_size = 1024;
bool write_page_index = false;
size_t local_read_min_bytes_for_seek = 8192;
} parquet{};

View File

@ -25,10 +25,10 @@ namespace ErrorCodes
enum class Base64Variant : uint8_t
{
Normal,
Url
URL
};
inline std::string preprocessBase64Url(std::string_view src)
inline std::string preprocessBase64URL(std::string_view src)
{
std::string padded_src;
padded_src.reserve(src.size() + 3);
@ -70,7 +70,7 @@ inline std::string preprocessBase64Url(std::string_view src)
return padded_src;
}
inline size_t postprocessBase64Url(UInt8 * dst, size_t out_len)
inline size_t postprocessBase64URL(UInt8 * dst, size_t out_len)
{
// Do symbol substitution as described in https://datatracker.ietf.org/doc/html/rfc4648#section-5
for (size_t i = 0; i < out_len; ++i)
@ -95,7 +95,7 @@ inline size_t postprocessBase64Url(UInt8 * dst, size_t out_len)
template <Base64Variant variant>
struct Base64Encode
{
static constexpr auto name = (variant == Base64Variant::Normal) ? "base64Encode" : "base64UrlEncode";
static constexpr auto name = (variant == Base64Variant::Normal) ? "base64Encode" : "base64URLEncode";
static size_t getBufferSize(size_t string_length, size_t string_count)
{
@ -111,8 +111,8 @@ struct Base64Encode
/// Memory sanitizer doesn't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
__msan_unpoison(dst, outlen);
if constexpr (variant == Base64Variant::Url)
outlen = postprocessBase64Url(dst, outlen);
if constexpr (variant == Base64Variant::URL)
outlen = postprocessBase64URL(dst, outlen);
return outlen;
}
@ -121,7 +121,7 @@ struct Base64Encode
template <Base64Variant variant>
struct Base64Decode
{
static constexpr auto name = (variant == Base64Variant::Normal) ? "base64Decode" : "base64UrlDecode";
static constexpr auto name = (variant == Base64Variant::Normal) ? "base64Decode" : "base64URLDecode";
static size_t getBufferSize(size_t string_length, size_t string_count)
{
@ -132,9 +132,9 @@ struct Base64Decode
{
int rc;
size_t outlen = 0;
if constexpr (variant == Base64Variant::Url)
if constexpr (variant == Base64Variant::URL)
{
std::string src_padded = preprocessBase64Url(src);
std::string src_padded = preprocessBase64URL(src);
rc = base64_decode(src_padded.data(), src_padded.size(), reinterpret_cast<char *>(dst), &outlen, 0);
}
else
@ -156,7 +156,7 @@ struct Base64Decode
template <Base64Variant variant>
struct TryBase64Decode
{
static constexpr auto name = (variant == Base64Variant::Normal) ? "tryBase64Decode" : "tryBase64UrlDecode";
static constexpr auto name = (variant == Base64Variant::Normal) ? "tryBase64Decode" : "tryBase64URLDecode";
static size_t getBufferSize(size_t string_length, size_t string_count)
{
@ -167,9 +167,9 @@ struct TryBase64Decode
{
int rc;
size_t outlen = 0;
if constexpr (variant == Base64Variant::Url)
if constexpr (variant == Base64Variant::URL)
{
std::string src_padded = preprocessBase64Url(src);
std::string src_padded = preprocessBase64URL(src);
rc = base64_decode(src_padded.data(), src_padded.size(), reinterpret_cast<char *>(dst), &outlen, 0);
}
else

View File

@ -5,16 +5,16 @@
namespace DB
{
REGISTER_FUNCTION(Base64UrlDecode)
REGISTER_FUNCTION(Base64URLDecode)
{
FunctionDocumentation::Description description = R"(Accepts a base64-encoded URL and decodes it from base64 with URL-specific modifications, according to RFC 4648 (https://datatracker.ietf.org/doc/html/rfc4648#section-5).)";
FunctionDocumentation::Syntax syntax = "base64UrlDecode(encodedUrl)";
FunctionDocumentation::Arguments arguments = {{"encodedUrl", "String column or constant. If the string is not a valid Base64-encoded value, an exception is thrown."}};
FunctionDocumentation::Syntax syntax = "base64URLDecode(encodedURL)";
FunctionDocumentation::Arguments arguments = {{"encodedURL", "String column or constant. If the string is not a valid Base64-encoded value, an exception is thrown."}};
FunctionDocumentation::ReturnedValue returned_value = "A string containing the decoded value of the argument.";
FunctionDocumentation::Examples examples = {{"Example", "SELECT base64UrlDecode('aHR0cDovL2NsaWNraG91c2UuY29t')", "https://clickhouse.com"}};
FunctionDocumentation::Examples examples = {{"Example", "SELECT base64URLDecode('aHR0cDovL2NsaWNraG91c2UuY29t')", "https://clickhouse.com"}};
FunctionDocumentation::Categories categories = {"String encoding"};
factory.registerFunction<FunctionBase64Conversion<Base64Decode<Base64Variant::Url>>>({description, syntax, arguments, returned_value, examples, categories});
factory.registerFunction<FunctionBase64Conversion<Base64Decode<Base64Variant::URL>>>({description, syntax, arguments, returned_value, examples, categories});
}
}

View File

@ -5,16 +5,16 @@
namespace DB
{
REGISTER_FUNCTION(Base64UrlEncode)
REGISTER_FUNCTION(Base64URLEncode)
{
FunctionDocumentation::Description description = R"(Encodes an URL (String or FixedString) as base64 with URL-specific modifications, according to RFC 4648 (https://datatracker.ietf.org/doc/html/rfc4648#section-5).)";
FunctionDocumentation::Syntax syntax = "base64UrlEncode(url)";
FunctionDocumentation::Syntax syntax = "base64URLEncode(url)";
FunctionDocumentation::Arguments arguments = {{"url", "String column or constant."}};
FunctionDocumentation::ReturnedValue returned_value = "A string containing the encoded value of the argument.";
FunctionDocumentation::Examples examples = {{"Example", "SELECT base64UrlEncode('https://clickhouse.com')", "aHR0cHM6Ly9jbGlja2hvdXNlLmNvbQ"}};
FunctionDocumentation::Examples examples = {{"Example", "SELECT base64URLEncode('https://clickhouse.com')", "aHR0cHM6Ly9jbGlja2hvdXNlLmNvbQ"}};
FunctionDocumentation::Categories categories = {"String encoding"};
factory.registerFunction<FunctionBase64Conversion<Base64Encode<Base64Variant::Url>>>({description, syntax, arguments, returned_value, examples, categories});
factory.registerFunction<FunctionBase64Conversion<Base64Encode<Base64Variant::URL>>>({description, syntax, arguments, returned_value, examples, categories});
}
}

View File

@ -5,16 +5,16 @@
namespace DB
{
REGISTER_FUNCTION(TryBase64UrlDecode)
REGISTER_FUNCTION(TryBase64URLDecode)
{
FunctionDocumentation::Description description = R"(Decodes an URL from base64, like base64UrlDecode but returns an empty string in case of an error.)";
FunctionDocumentation::Syntax syntax = "tryBase64UrlDecode(encodedUrl)";
FunctionDocumentation::Arguments arguments = {{"encodedUrl", "String column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, returns an empty string."}};
FunctionDocumentation::Description description = R"(Decodes an URL from base64, like base64URLDecode but returns an empty string in case of an error.)";
FunctionDocumentation::Syntax syntax = "tryBase64URLDecode(encodedUrl)";
FunctionDocumentation::Arguments arguments = {{"encodedURL", "String column or constant. If the string is not a valid Base64-encoded value with URL-specific modifications, returns an empty string."}};
FunctionDocumentation::ReturnedValue returned_value = "A string containing the decoded value of the argument.";
FunctionDocumentation::Examples examples = {{"valid", "SELECT tryBase64UrlDecode('aHR0cHM6Ly9jbGlja2hvdXNlLmNvbQ')", "https://clickhouse.com"}, {"invalid", "SELECT tryBase64UrlDecode('aHR0cHM6Ly9jbGlja')", ""}};
FunctionDocumentation::Examples examples = {{"valid", "SELECT tryBase64URLDecode('aHR0cHM6Ly9jbGlja2hvdXNlLmNvbQ')", "https://clickhouse.com"}, {"invalid", "SELECT tryBase64UrlDecode('aHR0cHM6Ly9jbGlja')", ""}};
FunctionDocumentation::Categories categories = {"String encoding"};
factory.registerFunction<FunctionBase64Conversion<TryBase64Decode<Base64Variant::Url>>>({description, syntax, arguments, returned_value, examples, categories});
factory.registerFunction<FunctionBase64Conversion<TryBase64Decode<Base64Variant::URL>>>({description, syntax, arguments, returned_value, examples, categories});
}
}

View File

@ -83,6 +83,20 @@ void CascadeWriteBuffer::finalizeImpl()
}
}
void CascadeWriteBuffer::cancelImpl() noexcept
{
if (curr_buffer)
curr_buffer->position() = position();
for (auto & buf : prepared_sources)
{
if (buf)
{
buf->cancel();
}
}
}
WriteBuffer * CascadeWriteBuffer::setNextBuffer()
{
if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
* (lazy_sources contains not pointers themself, but their delayed constructors)
*
* Firtly, CascadeWriteBuffer redirects data to first buffer of the sequence
* If current WriteBuffer cannot receive data anymore, it throws special exception MemoryWriteBuffer::CurrentBufferExhausted in nextImpl() body,
* If current WriteBuffer cannot receive data anymore, it throws special exception WriteBuffer::CurrentBufferExhausted in nextImpl() body,
* CascadeWriteBuffer prepare next buffer and continuously redirects data to it.
* If there are no buffers anymore CascadeWriteBuffer throws an exception.
*
@ -48,6 +48,7 @@ public:
private:
void finalizeImpl() override;
void cancelImpl() noexcept override;
WriteBuffer * setNextBuffer();

View File

@ -16,11 +16,11 @@ namespace DB
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
public:
/// Special exception to throw when the current WriteBuffer cannot receive data
/// Special exception to throw when the current MemoryWriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "MemoryWriteBuffer limit is exhausted"; }
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
/// Use max_total_size_ = 0 for unlimited storage

View File

@ -11,7 +11,7 @@ namespace DB
WriteBuffer::~WriteBuffer()
{
// That destructor could be call with finalized=false in case of exceptions
if (count() > 0 && !finalized)
if (count() > 0 && !finalized && !canceled)
{
/// It is totally OK to destroy instance without finalization when an exception occurs
/// However it is suspicious to destroy instance without finalization at the green path
@ -20,7 +20,7 @@ WriteBuffer::~WriteBuffer()
LoggerPtr log = getLogger("WriteBuffer");
LOG_ERROR(
log,
"WriteBuffer is not finalized when destructor is called. "
"WriteBuffer is neither finalized nor canceled when destructor is called. "
"No exceptions in flight are detected. "
"The file might not be written at all or might be truncated. "
"Stack trace: {}",
@ -30,4 +30,13 @@ WriteBuffer::~WriteBuffer()
}
}
void WriteBuffer::cancel() noexcept
{
if (canceled || finalized)
return;
LockMemoryExceptionInThread lock(VariableContext::Global);
cancelImpl();
canceled = true;
}
}

View File

@ -59,6 +59,7 @@ public:
*/
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
@ -75,7 +76,6 @@ public:
next();
}
void write(const char * from, size_t n)
{
if (finalized)
@ -121,6 +121,9 @@ public:
if (finalized)
return;
if (canceled)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot finalize buffer after cancellation.");
LockMemoryExceptionInThread lock(VariableContext::Global);
try
{
@ -130,11 +133,15 @@ public:
catch (...)
{
pos = working_buffer.begin();
finalized = true;
cancel();
throw;
}
}
void cancel() noexcept;
/// Wait for data to be reliably written. Mainly, call fsync for fd.
/// May be called after finalize() if needed.
virtual void sync()
@ -150,7 +157,12 @@ protected:
next();
}
virtual void cancelImpl() noexcept
{
}
bool finalized = false;
bool canceled = false;
private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).

View File

@ -47,6 +47,11 @@ public:
}
}
void cancelImpl() noexcept override
{
out->cancel();
}
WriteBuffer * getNestedBuffer() { return out; }
protected:

View File

@ -79,6 +79,7 @@ WriteBufferFromFile::~WriteBufferFromFile()
try
{
if (!canceled)
finalize();
}
catch (...)
@ -111,6 +112,7 @@ void WriteBufferFromFile::close()
if (fd < 0)
return;
if (!canceled)
finalize();
if (0 != ::close(fd))

View File

@ -28,6 +28,12 @@ void WriteBufferFromFileDecorator::finalizeImpl()
}
}
void WriteBufferFromFileDecorator::cancelImpl() noexcept
{
SwapHelper swap(*this, *impl);
impl->cancel();
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
/// It is not a mistake that swap is called here

View File

@ -24,6 +24,8 @@ public:
protected:
void finalizeImpl() override;
void cancelImpl() noexcept override;
std::unique_ptr<WriteBuffer> impl;
private:

View File

@ -107,6 +107,7 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
try
{
if (!canceled)
finalize();
}
catch (...)

View File

@ -197,6 +197,7 @@ WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
try
{
if (!canceled)
finalize();
}
catch (...)

View File

@ -224,6 +224,11 @@ void WriteBufferFromS3::finalizeImpl()
}
}
void WriteBufferFromS3::cancelImpl() noexcept
{
tryToAbortMultipartUpload();
}
String WriteBufferFromS3::getVerboseLogDetails() const
{
String multipart_upload_details;
@ -246,7 +251,7 @@ String WriteBufferFromS3::getShortLogDetails() const
bucket, key, multipart_upload_details);
}
void WriteBufferFromS3::tryToAbortMultipartUpload()
void WriteBufferFromS3::tryToAbortMultipartUpload() noexcept
{
try
{
@ -264,8 +269,19 @@ WriteBufferFromS3::~WriteBufferFromS3()
{
LOG_TRACE(limitedLog, "Close WriteBufferFromS3. {}.", getShortLogDetails());
if (canceled)
{
LOG_INFO(
log,
"WriteBufferFromS3 was canceled."
"The file might not be written to S3. "
"{}.",
getVerboseLogDetails());
return;
}
/// That destructor could be call with finalized=false in case of exceptions
if (!finalized)
if (!finalized && !canceled)
{
LOG_INFO(
log,

View File

@ -54,6 +54,8 @@ private:
/// Receives response from the server after sending all data.
void finalizeImpl() override;
void cancelImpl() noexcept override;
String getVerboseLogDetails() const;
String getShortLogDetails() const;
@ -71,7 +73,7 @@ private:
void createMultipartUpload();
void completeMultipartUpload();
void abortMultipartUpload();
void tryToAbortMultipartUpload();
void tryToAbortMultipartUpload() noexcept;
S3::PutObjectRequest getPutRequest(PartData & data);
void makeSinglepartUpload(PartData && data);

View File

@ -63,6 +63,7 @@ public:
~WriteBufferFromVector() override
{
if (!canceled)
finalize();
}

View File

@ -2116,7 +2116,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
}
StoragePtr Context::buildParametrizedViewStorage(const ASTPtr & table_expression, const String & database_name, const String & table_name)
StoragePtr Context::buildParametrizedViewStorage(const String & database_name, const String & table_name, const NameToNameMap & param_values)
{
if (table_name.empty())
return nullptr;
@ -2129,8 +2129,7 @@ StoragePtr Context::buildParametrizedViewStorage(const ASTPtr & table_expression
return nullptr;
auto query = original_view->getInMemoryMetadataPtr()->getSelectQuery().inner_query->clone();
NameToNameMap parameterized_view_values = analyzeFunctionParamValues(table_expression, getQueryContext());
StorageView::replaceQueryParametersIfParametrizedView(query, parameterized_view_values);
StorageView::replaceQueryParametersIfParametrizedView(query, param_values);
ASTCreateQuery create;
create.select = query->as<ASTSelectWithUnionQuery>();

View File

@ -774,7 +774,7 @@ public:
/// Overload for the new analyzer. Structure inference is performed in QueryAnalysisPass.
StoragePtr executeTableFunction(const ASTPtr & table_expression, const TableFunctionPtr & table_function_ptr);
StoragePtr buildParametrizedViewStorage(const ASTPtr & table_expression, const String & database_name, const String & table_name);
StoragePtr buildParametrizedViewStorage(const String & database_name, const String & table_name, const NameToNameMap & param_values);
void addViewSource(const StoragePtr & storage);
StoragePtr getViewSource() const;

View File

@ -321,7 +321,12 @@ void Loggers::updateLevels(Poco::Util::AbstractConfiguration & config, Poco::Log
bool should_log_to_console = isatty(STDIN_FILENO) || isatty(STDERR_FILENO);
if (config.getBool("logger.console", false)
|| (!config.hasProperty("logger.console") && !is_daemon && should_log_to_console))
split->setLevel("console", log_level);
{
auto console_log_level_string = config.getString("logger.console_log_level", log_level_string);
auto console_log_level = Poco::Logger::parseLevel(console_log_level_string);
max_log_level = std::max(console_log_level, max_log_level);
split->setLevel("console", console_log_level);
}
else
split->setLevel("console", 0);

View File

@ -82,6 +82,16 @@ private:
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ true);
}
else if (function.name == "azureBlobStorage")
{
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ false);
}
else if (function.name == "azureBlobStorageCluster")
{
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ true);
}
else if ((function.name == "remote") || (function.name == "remoteSecure"))
{
/// remote('addresses_expr', 'db', 'table', 'user', 'password', ...)
@ -169,6 +179,43 @@ private:
markSecretArgument(url_arg_idx + 2);
}
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
{
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
size_t url_arg_idx = is_cluster_function ? 1 : 0;
if (!is_cluster_function && isNamedCollectionName(0))
{
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
findSecretNamedArgument("account_key", 1);
return;
}
else if (is_cluster_function && isNamedCollectionName(1))
{
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
findSecretNamedArgument("account_key", 2);
return;
}
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
size_t count = arguments->size();
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
{
String second_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
{
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
}
}
/// We're going to replace 'account_key' with '[HIDDEN]' if account_key is used in the signature
if (url_arg_idx + 4 < count)
markSecretArgument(url_arg_idx + 4);
}
void findURLSecretArguments()
{
if (!isNamedCollectionName(0))

View File

@ -213,6 +213,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
return res;
};
/// Keep this list of keywords in sync with ParserDataType::parseImpl().
if (!null_check_without_moving()
&& !s_default.checkWithoutMoving(pos, expected)
&& !s_materialized.checkWithoutMoving(pos, expected)

View File

@ -1,5 +1,6 @@
#include <Parsers/ParserDataType.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIdentifier_fwd.h>
@ -103,12 +104,28 @@ bool ParserDataType::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
tryGetIdentifierNameInto(identifier, type_name);
/// Don't accept things like Array(`x.y`).
/// When parsing we accept quoted type names (e.g. `UInt64`), but when formatting we print them
/// unquoted (e.g. UInt64). This introduces problems when the string in the quotes is garbage:
/// * Array(`x.y`) -> Array(x.y) -> fails to parse
/// * `Null` -> Null -> parses as keyword instead of type name
/// Here we check for these cases and reject.
if (!std::all_of(type_name.begin(), type_name.end(), [](char c) { return isWordCharASCII(c) || c == '$'; }))
{
expected.add(pos, "type name");
return false;
}
/// Keywords that IParserColumnDeclaration recognizes before the type name.
/// E.g. reject CREATE TABLE a (x `Null`) because in "x Null" the Null would be parsed as
/// column attribute rather than type name.
{
String n = type_name;
boost::to_upper(n);
if (n == "NOT" || n == "NULL" || n == "DEFAULT" || n == "MATERIALIZED" || n == "EPHEMERAL" || n == "ALIAS" || n == "AUTO" || n == "PRIMARY" || n == "COMMENT" || n == "CODEC")
{
expected.add(pos, "type name");
return false;
}
}
String type_name_upper = Poco::toUpper(type_name);
String type_name_suffix;

View File

@ -321,6 +321,9 @@ void ParquetBlockOutputFormat::writeUsingArrow(std::vector<Chunk> chunks)
parquet::WriterProperties::Builder builder;
builder.version(getParquetVersion(format_settings));
builder.compression(getParquetCompression(format_settings.parquet.output_compression_method));
// write page index is disable at default.
if (format_settings.parquet.write_page_index)
builder.enable_write_page_index();
parquet::ArrowWriterProperties::Builder writer_props_builder;
if (format_settings.parquet.output_compliant_nested_types)

View File

@ -1,21 +1,57 @@
#include <iostream>
#include <Processors/IProcessor.h>
#include <Common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
void IProcessor::cancel()
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (already_cancelled)
return;
onCancel();
}
String IProcessor::debug() const
{
WriteBufferFromOwnString buf;
writeString(getName(), buf);
buf.write('\n');
writeString("inputs (hasData, isFinished):\n", buf);
for (const auto & port : inputs)
{
buf.write('\t');
writeBoolText(port.hasData(), buf);
buf.write(' ');
writeBoolText(port.isFinished(), buf);
buf.write('\n');
}
writeString("outputs (hasData, isNeeded):\n", buf);
for (const auto & port : outputs)
{
buf.write('\t');
writeBoolText(port.hasData(), buf);
buf.write(' ');
writeBoolText(port.isNeeded(), buf);
buf.write('\n');
}
buf.finalize();
return buf.str();
}
void IProcessor::dump() const
{
std::cerr << getName() << "\n";
std::cerr << "inputs:\n";
for (const auto & port : inputs)
std::cerr << "\t" << port.hasData() << " " << port.isFinished() << "\n";
std::cerr << "outputs:\n";
for (const auto & port : outputs)
std::cerr << "\t" << port.hasData() << " " << port.isNeeded() << "\n";
std::cerr << debug();
}
@ -39,4 +75,3 @@ std::string IProcessor::statusToName(Status status)
}
}

View File

@ -238,12 +238,7 @@ public:
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
void cancel()
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (!already_cancelled)
onCancel();
}
void cancel();
/// Additional method which is called in case if ports were updated while work() method.
/// May be used to stop execution in rare cases.
@ -286,6 +281,7 @@ public:
const auto & getOutputs() const { return outputs; }
/// Debug output.
String debug() const;
void dump() const;
/// Used to print pipeline.

View File

@ -120,7 +120,7 @@ Chunk PostgreSQLSource<T>::generate()
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
while (true)
while (!isCancelled())
{
const std::vector<pqxx::zview> * row{stream->read_row()};

View File

@ -9,7 +9,6 @@
#include <base/defines.h>
#include <base/types.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
@ -19,6 +18,7 @@
#include <Interpreters/FullSortingMergeJoin.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Chunk.h>
#include <Processors/Transforms/MergeJoinTransform.h>
@ -40,7 +40,7 @@ FullMergeJoinCursorPtr createCursor(const Block & block, const Names & columns)
desc.reserve(columns.size());
for (const auto & name : columns)
desc.emplace_back(name);
return std::make_unique<FullMergeJoinCursor>(materializeBlock(block), desc);
return std::make_unique<FullMergeJoinCursor>(block, desc);
}
template <bool has_left_nulls, bool has_right_nulls>
@ -234,9 +234,14 @@ void inline addMany(PaddedPODArray<UInt64> & left_or_right_map, size_t idx, size
for (size_t i = 0; i < num; ++i)
left_or_right_map.push_back(idx);
}
}
FullMergeJoinCursor::FullMergeJoinCursor(const Block & sample_block_, const SortDescription & description_)
: sample_block(materializeBlock(sample_block_).cloneEmpty()), desc(description_)
{
}
const Chunk & FullMergeJoinCursor::getCurrent() const
{
return current_chunk;
@ -260,6 +265,10 @@ void FullMergeJoinCursor::setChunk(Chunk && chunk)
return;
}
// should match the structure of sample_block (after materialization)
convertToFullIfConst(chunk);
convertToFullIfSparse(chunk);
current_chunk = std::move(chunk);
cursor = SortCursorImpl(sample_block, current_chunk.getColumns(), desc);
}

View File

@ -193,11 +193,7 @@ private:
class FullMergeJoinCursor : boost::noncopyable
{
public:
explicit FullMergeJoinCursor(const Block & sample_block_, const SortDescription & description_)
: sample_block(sample_block_.cloneEmpty())
, desc(description_)
{
}
explicit FullMergeJoinCursor(const Block & sample_block_, const SortDescription & description_);
bool fullyCompleted() const;
void setChunk(Chunk && chunk);

View File

@ -162,6 +162,7 @@ WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
{
if (!canceled)
finalize();
}
catch (...)

View File

@ -1027,14 +1027,7 @@ catch (...)
{
tryLogCurrentException(log, "Cannot send exception to client");
try
{
used_output.finalize();
}
catch (...)
{
tryLogCurrentException(log, "Cannot flush data to client (after sending exception)");
}
used_output.cancel();
}
void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
@ -1172,7 +1165,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
/// Check if exception was thrown in used_output.finalize().
/// In this case used_output can be in invalid state and we
/// cannot write in it anymore. So, just log this exception.
if (used_output.isFinalized())
if (used_output.isFinalized() || used_output.isCanceled())
{
if (thread_trace_context)
thread_trace_context->root_span.addAttribute("clickhouse.exception", "Cannot flush data to client");
@ -1191,6 +1184,8 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
if (thread_trace_context)
thread_trace_context->root_span.addAttribute(status);
return;
}
used_output.finalize();

View File

@ -78,6 +78,7 @@ private:
WriteBuffer * out_maybe_delayed_and_compressed = nullptr;
bool finalized = false;
bool canceled = false;
bool exception_is_written = false;
std::function<void(WriteBuffer &, const String &)> exception_writer;
@ -99,6 +100,24 @@ private:
out->finalize();
}
void cancel()
{
if (canceled)
return;
canceled = true;
if (out_compressed_holder)
out_compressed_holder->cancel();
if (out)
out->cancel();
}
bool isCanceled() const
{
return canceled;
}
bool isFinalized() const
{
return finalized;

View File

@ -387,7 +387,7 @@ void TCPHandler::runImpl()
query_scope.emplace(query_context, /* fatal_error_callback */ [this]
{
std::lock_guard lock(fatal_error_mutex);
std::lock_guard lock(out_mutex);
sendLogs();
});
@ -475,7 +475,7 @@ void TCPHandler::runImpl()
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);
std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return {};
@ -491,7 +491,7 @@ void TCPHandler::runImpl()
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
std::lock_guard lock(task_callback_mutex);
std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return;
@ -505,7 +505,7 @@ void TCPHandler::runImpl()
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);
std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return std::nullopt;
@ -553,7 +553,7 @@ void TCPHandler::runImpl()
{
auto callback = [this]()
{
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
std::scoped_lock lock(out_mutex, task_callback_mutex);
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
return true;
@ -572,7 +572,7 @@ void TCPHandler::runImpl()
finish_or_cancel();
std::lock_guard lock(task_callback_mutex);
std::lock_guard lock(out_mutex);
/// Send final progress after calling onFinish(), since it will update the progress.
///
@ -595,7 +595,7 @@ void TCPHandler::runImpl()
break;
{
std::lock_guard lock(task_callback_mutex);
std::lock_guard lock(out_mutex);
sendLogs();
sendEndOfStream();
}
@ -1014,7 +1014,7 @@ void TCPHandler::processOrdinaryQuery()
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
{
std::lock_guard lock(task_callback_mutex);
std::lock_guard lock(out_mutex);
sendPartUUIDs();
}
@ -1024,13 +1024,13 @@ void TCPHandler::processOrdinaryQuery()
if (header)
{
std::lock_guard lock(task_callback_mutex);
std::lock_guard lock(out_mutex);
sendData(header);
}
}
/// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
std::unique_lock out_lock(out_mutex, std::defer_lock);
{
PullingAsyncPipelineExecutor executor(pipeline);
@ -1056,6 +1056,9 @@ void TCPHandler::processOrdinaryQuery()
executor.cancelReading();
}
lock.unlock();
out_lock.lock();
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
{
/// Some time passed and there is a progress.
@ -1071,12 +1074,14 @@ void TCPHandler::processOrdinaryQuery()
if (!state.io.null_format)
sendData(block);
}
out_lock.unlock();
}
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
progress_lock.lock();
out_lock.lock();
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use

View File

@ -19,6 +19,7 @@
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include "Core/Types.h"
#include "IServer.h"
#include "Interpreters/AsynchronousInsertQueue.h"
#include "Server/TCPProtocolStackData.h"
@ -225,8 +226,13 @@ private:
std::optional<UInt64> nonce;
String cluster;
/// `out_mutex` protects `out` (WriteBuffer).
/// So it is used for method sendData(), sendProgress(), sendLogs(), etc.
std::mutex out_mutex;
/// `task_callback_mutex` protects tasks callbacks.
/// Inside these callbacks we might also change cancellation status,
/// so it also protects cancellation status checks.
std::mutex task_callback_mutex;
std::mutex fatal_error_mutex;
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;

View File

@ -531,9 +531,9 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_columns.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
ctx->rows_sources_write_buf->next();
ctx->rows_sources_uncompressed_write_buf->next();
/// Ensure data has written to disk.
ctx->rows_sources_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
size_t rows_sources_count = ctx->rows_sources_write_buf->count();

View File

@ -245,6 +245,8 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
writeBinaryLittleEndian(sum.uncompressed_hash, out);
}
}
out.finalize();
}
void MergeTreeDataPartChecksums::addFile(const String & file_name, UInt64 file_size, MergeTreeDataPartChecksum::uint128 file_hash)

View File

@ -54,6 +54,10 @@ void MergeTreeSink::onFinish()
finishDelayedChunk();
}
void MergeTreeSink::onCancel()
{
}
void MergeTreeSink::consume(Chunk chunk)
{
if (num_blocks_processed > 0)

View File

@ -28,6 +28,7 @@ public:
void consume(Chunk chunk) override;
void onStart() override;
void onFinish() override;
void onCancel() override;
private:
StorageMergeTree & storage;

View File

@ -136,6 +136,7 @@ WriteBufferFromHDFS::~WriteBufferFromHDFS()
{
try
{
if (!canceled)
finalize();
}
catch (...)

View File

@ -50,56 +50,58 @@ void StorageObjectStorageSink::consume(Chunk chunk)
void StorageObjectStorageSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelBuffers();
releaseBuffers();
cancelled = true;
}
void StorageObjectStorageSink::onException(std::exception_ptr exception)
void StorageObjectStorageSink::onException(std::exception_ptr)
{
std::lock_guard lock(cancel_mutex);
try
{
std::rethrow_exception(exception);
}
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization.
release();
}
cancelBuffers();
releaseBuffers();
}
void StorageObjectStorageSink::onFinish()
{
std::lock_guard lock(cancel_mutex);
finalize();
finalizeBuffers();
}
void StorageObjectStorageSink::finalize()
void StorageObjectStorageSink::finalizeBuffers()
{
if (!writer)
return;
try
{
writer->finalize();
writer->flush();
writer->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
release();
releaseBuffers();
throw;
}
write_buf->finalize();
}
void StorageObjectStorageSink::release()
void StorageObjectStorageSink::releaseBuffers()
{
writer.reset();
write_buf.reset();
}
void StorageObjectStorageSink::cancelBuffers()
{
if (writer)
writer->cancel();
if (write_buf)
write_buf->cancel();
}
PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,

View File

@ -35,8 +35,9 @@ private:
bool cancelled = false;
std::mutex cancel_mutex;
void finalize();
void release();
void finalizeBuffers();
void releaseBuffers();
void cancelBuffers();
};
class PartitionedStorageObjectStorageSink : public PartitionedSink

View File

@ -193,21 +193,21 @@ Chunk StorageObjectStorageSource::generate()
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const auto & object_info = reader.getObjectInfo();
const auto & filename = object_info.getFileName();
chassert(object_info.metadata);
const auto & filename = object_info->getFileName();
chassert(object_info->metadata);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns,
{
.path = getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false),
.size = object_info.metadata->size_bytes,
.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info.metadata->last_modified
.last_modified = object_info->metadata->last_modified
});
return chunk;
}
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
addNumRowsToCache(reader.getObjectInfo(), total_rows_in_file);
addNumRowsToCache(*reader.getObjectInfo(), total_rows_in_file);
total_rows_in_file = 0;

View File

@ -100,7 +100,7 @@ protected:
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
const ObjectInfo & getObjectInfo() const { return *object_info; }
ObjectInfoPtr getObjectInfo() const { return object_info; }
const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); }
private:

View File

@ -35,6 +35,11 @@ namespace
}
}
void S3QueueIFileMetadata::FileStatus::setProcessingEndTime()
{
processing_end_time = now();
}
void S3QueueIFileMetadata::FileStatus::onProcessing()
{
state = FileStatus::State::Processing;
@ -44,13 +49,15 @@ void S3QueueIFileMetadata::FileStatus::onProcessing()
void S3QueueIFileMetadata::FileStatus::onProcessed()
{
state = FileStatus::State::Processed;
processing_end_time = now();
if (!processing_end_time)
setProcessingEndTime();
}
void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
{
state = FileStatus::State::Failed;
processing_end_time = now();
if (!processing_end_time)
setProcessingEndTime();
std::lock_guard lock(last_exception_mutex);
last_exception = exception;
}
@ -120,7 +127,14 @@ S3QueueIFileMetadata::~S3QueueIFileMetadata()
{
if (processing_id_version.has_value())
{
file_status->onFailed("Uncaught exception");
if (file_status->getException().empty())
{
if (std::current_exception())
file_status->onFailed(getCurrentExceptionMessage(true));
else
file_status->onFailed("Unprocessed exception");
}
LOG_TEST(log, "Removing processing node in destructor for file: {}", path);
try
{
@ -227,7 +241,16 @@ void S3QueueIFileMetadata::setProcessed()
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
file_status->onProcessed();
try
{
setProcessedImpl();
}
catch (...)
{
file_status->onFailed(getCurrentExceptionMessage(true));
throw;
}
processing_id.reset();
processing_id_version.reset();
@ -235,18 +258,36 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailed(const std::string & exception)
void S3QueueIFileMetadata::setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status)
{
LOG_TRACE(log, "Setting file {} as failed (exception: {}, path: {})", path, exception, failed_node_path);
LOG_TRACE(log, "Setting file {} as failed (path: {}, reduce retry count: {}, exception: {})",
path, failed_node_path, reduce_retry_count, exception_message);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
file_status->onFailed(exception);
node_metadata.last_exception = exception;
if (overwrite_status || file_status->state != FileStatus::State::Failed)
file_status->onFailed(exception_message);
node_metadata.last_exception = exception_message;
if (reduce_retry_count)
{
try
{
if (max_loading_retries == 0)
setFailedNonRetriable();
else
setFailedRetriable();
}
catch (...)
{
auto full_exception = fmt::format(
"First exception: {}, exception while setting file as failed: {}",
exception_message, getCurrentExceptionMessage(true));
file_status->onFailed(full_exception);
throw;
}
}
processing_id.reset();
processing_id_version.reset();
@ -296,19 +337,20 @@ void S3QueueIFileMetadata::setFailedRetriable()
auto zk_client = getZooKeeper();
/// Extract the number of already done retries from node_hash.retriable node if it exists.
Coordination::Requests requests;
Coordination::Stat stat;
std::string res;
if (zk_client->tryGet(retrieable_failed_node_path, res, &stat))
bool has_failed_before = zk_client->tryGet(retrieable_failed_node_path, res, &stat);
if (has_failed_before)
{
auto failed_node_metadata = NodeMetadata::fromString(res);
node_metadata.retries = failed_node_metadata.retries + 1;
file_status->retries = node_metadata.retries;
}
LOG_TRACE(log, "File `{}` failed to process, try {}/{}",
path, node_metadata.retries, max_loading_retries);
LOG_TRACE(log, "File `{}` failed to process, try {}/{}, retries node exists: {} (failed node path: {})",
path, node_metadata.retries, max_loading_retries, has_failed_before, failed_node_path);
Coordination::Requests requests;
if (node_metadata.retries >= max_loading_retries)
{
/// File is no longer retriable.

View File

@ -19,6 +19,7 @@ public:
None
};
void setProcessingEndTime();
void onProcessing();
void onProcessed();
void onFailed(const std::string & exception);
@ -54,13 +55,15 @@ public:
bool setProcessing();
void setProcessed();
void setFailed(const std::string & exception);
void setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status);
virtual void setProcessedAtStartRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client) = 0;
FileStatusPtr getFileStatus() { return file_status; }
const std::string & getPath() const { return path; }
size_t getMaxTries() const { return max_loading_retries; }
struct NodeMetadata
{

View File

@ -133,6 +133,9 @@ S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3Queue
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
}
LOG_TRACE(log, "Mode: {}, buckets: {}, processing threads: {}, result buckets num: {}",
settings.mode.toString(), settings.s3queue_buckets, settings.s3queue_processing_threads_num, buckets_num);
}
S3QueueMetadata::~S3QueueMetadata()
@ -219,7 +222,7 @@ S3QueueMetadata::Bucket S3QueueMetadata::getBucketForPath(const std::string & pa
S3QueueOrderedFileMetadata::BucketHolderPtr
S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
{
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor);
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
}
void S3QueueMetadata::initialize(

View File

@ -45,13 +45,15 @@ S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
int bucket_version_,
const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_)
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_)
: bucket_info(std::make_shared<BucketInfo>(BucketInfo{
.bucket = bucket_,
.bucket_version = bucket_version_,
.bucket_lock_path = bucket_lock_path_,
.bucket_lock_id_path = bucket_lock_id_path_}))
, zk_client(zk_client_)
, log(log_)
{
}
@ -61,7 +63,9 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
return;
released = true;
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}", bucket_info->bucket);
LOG_TEST(log, "Releasing bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
Coordination::Requests requests;
/// Check that bucket lock version has not changed
@ -72,11 +76,24 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
Coordination::Responses responses;
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
LOG_TEST(log, "Released bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
else
LOG_TRACE(log,
"Failed to release bucket {}, version {}: {}. "
"This is normal if keeper session expired.",
bucket_info->bucket, bucket_info->bucket_version, code);
zkutil::KeeperMultiException::check(code, requests, responses);
}
S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
{
if (!released)
LOG_TEST(log, "Releasing bucket ({}) holder in destructor", bucket_info->bucket);
try
{
release();
@ -154,7 +171,8 @@ S3QueueOrderedFileMetadata::Bucket S3QueueOrderedFileMetadata::getBucketForPath(
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor)
const Processor & processor,
LoggerPtr log_)
{
const auto zk_client = getZooKeeper();
const auto bucket_lock_path = zk_path / "buckets" / toString(bucket) / "lock";
@ -183,7 +201,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
const auto bucket_lock_version = set_response->stat.version;
LOG_TEST(
getLogger("S3QueueOrderedFileMetadata"),
log_,
"Processor {} acquired bucket {} for processing (bucket lock version: {})",
processor, bucket, bucket_lock_version);
@ -192,7 +210,8 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
bucket_lock_version,
bucket_lock_path,
bucket_lock_id_path,
zk_client);
zk_client,
log_);
}
if (code == Coordination::Error::ZNODEEXISTS)
@ -384,8 +403,11 @@ void S3QueueOrderedFileMetadata::setProcessedImpl()
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
if (max_loading_retries)
zk_client->tryRemove(failed_node_path + ".retriable", -1);
if (max_loading_retries
&& zk_client->tryRemove(failed_node_path + ".retriable", -1) == Coordination::Error::ZOK)
{
LOG_TEST(log, "Removed node {}.retriable", failed_node_path);
}
return;
}

View File

@ -36,7 +36,8 @@ public:
static BucketHolderPtr tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor);
const Processor & processor,
LoggerPtr log_);
static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
@ -72,26 +73,32 @@ private:
bool ignore_if_exists);
};
struct S3QueueOrderedFileMetadata::BucketHolder
struct S3QueueOrderedFileMetadata::BucketHolder : private boost::noncopyable
{
BucketHolder(
const Bucket & bucket_,
int bucket_version_,
const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_);
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_);
~BucketHolder();
Bucket getBucket() const { return bucket_info->bucket; }
BucketInfoPtr getBucketInfo() const { return bucket_info; }
void setFinished() { finished = true; }
bool isFinished() const { return finished; }
void release();
private:
BucketInfoPtr bucket_info;
const zkutil::ZooKeeperPtr zk_client;
bool released = false;
bool finished = false;
LoggerPtr log;
};
}

View File

@ -19,7 +19,7 @@ class ASTStorage;
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_loading_retries, 10, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
@ -31,6 +31,10 @@ class ASTStorage;
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
M(UInt32, s3queue_max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \

View File

@ -32,16 +32,16 @@ namespace ErrorCodes
}
StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_)
: ObjectInfo(object_info.relative_path, object_info.metadata)
, processing_holder(processing_holder_)
const Source::ObjectInfo & object_info,
S3QueueMetadata::FileMetadataPtr file_metadata_)
: Source::ObjectInfo(object_info.relative_path, object_info.metadata)
, file_metadata(file_metadata_)
{
}
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_)
: StorageObjectStorageSource::IIterator("S3QueueIterator")
@ -52,25 +52,52 @@ StorageS3QueueSource::FileIterator::FileIterator(
{
}
bool StorageS3QueueSource::FileIterator::isFinished() const
{
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished, objects_to_retry.size());
return iterator_finished
&& std::all_of(listed_keys_cache.begin(), listed_keys_cache.end(), [](const auto & v) { return v.second.keys.empty(); })
&& objects_to_retry.empty();
}
size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
}
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
{
ObjectInfoPtr object_info;
Source::ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called)
{
if (metadata->useBucketsForProcessing())
{
std::lock_guard lock(mutex);
std::tie(object_info, bucket_info) = getNextKeyFromAcquiredBucket(processor);
}
else
{
std::lock_guard lock(mutex);
if (objects_to_retry.empty())
{
object_info = glob_iterator->next(processor);
if (!object_info)
iterator_finished = true;
}
else
{
object_info = objects_to_retry.front();
objects_to_retry.pop_front();
}
}
if (!object_info)
{
LOG_TEST(log, "No object left");
return {};
}
if (shutdown_called)
{
@ -85,19 +112,64 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
return {};
}
std::pair<StorageS3QueueSource::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr object_info)
{
chassert(object_info);
if (metadata->useBucketsForProcessing())
{
const auto bucket = metadata->getBucketForPath(object_info->relative_path);
listed_keys_cache[bucket].keys.emplace_front(object_info);
}
else
{
objects_to_retry.push_back(object_info);
}
}
void StorageS3QueueSource::FileIterator::releaseFinishedBuckets()
{
for (const auto & [processor, holders] : bucket_holders)
{
LOG_TEST(log, "Releasing {} bucket holders for processor {}", holders.size(), processor);
for (auto it = holders.begin(); it != holders.end(); ++it)
{
const auto & holder = *it;
const auto bucket = holder->getBucketInfo()->bucket;
if (!holder->isFinished())
{
/// Only the last holder in the list of holders can be non-finished.
chassert(std::next(it) == holders.end());
/// Do not release non-finished bucket holder. We will continue processing it.
LOG_TEST(log, "Bucket {} is not finished yet, will not release it", bucket);
break;
}
/// Release bucket lock.
holder->release();
/// Reset bucket processor in cached state.
auto cached_info = listed_keys_cache.find(bucket);
if (cached_info != listed_keys_cache.end())
cached_info->second.processor.reset();
}
}
}
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
/// and getting/putting result into listed_keys_cache.
std::lock_guard lock(buckets_mutex);
auto bucket_holder_it = bucket_holders.emplace(processor, std::vector<BucketHolderPtr>{}).first;
BucketHolder * current_bucket_holder = bucket_holder_it->second.empty() || bucket_holder_it->second.back()->isFinished()
? nullptr
: bucket_holder_it->second.back().get();
auto bucket_holder_it = bucket_holders.emplace(processor, nullptr).first;
auto current_processor = toString(processor);
LOG_TEST(
log, "Current processor: {}, acquired bucket: {}",
processor, bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None");
processor, current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None");
while (true)
{
@ -106,9 +178,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// In case it is already acquired, they put the key into listed_keys_cache,
/// so that the thread who acquired the bucket will be able to see
/// those keys without the need to list s3 directory once again.
if (bucket_holder_it->second)
if (current_bucket_holder)
{
const auto bucket = bucket_holder_it->second->getBucket();
const auto bucket = current_bucket_holder->getBucket();
auto it = listed_keys_cache.find(bucket);
if (it != listed_keys_cache.end())
{
@ -141,7 +213,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Current bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
LOG_TEST(log, "Cache of bucket {} is empty", bucket);
@ -156,9 +228,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
if (iterator_finished)
{
/// Bucket is fully processed - release the bucket.
bucket_holder_it->second->release();
bucket_holder_it->second.reset();
/// Bucket is fully processed, but we will release it later
/// - once we write and commit files via commit() method.
current_bucket_holder->setFinished();
}
}
/// If processing thread has already acquired some bucket
@ -167,8 +239,10 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// because one processing thread can acquire only one bucket at a time.
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
/// to see if there are keys from buckets not acquired by anyone.
if (!bucket_holder_it->second)
if (!current_bucket_holder)
{
LOG_TEST(log, "Checking caches keys: {}", listed_keys_cache.size());
for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();)
{
auto & [bucket, bucket_info] = *it;
@ -193,8 +267,8 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor);
if (!bucket_holder_it->second)
auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (!acquired_bucket)
{
LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})",
bucket, bucket_keys.size());
@ -202,6 +276,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_processor = current_processor;
/// Take the key from the front, the order is important.
@ -211,7 +288,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Acquired bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
}
@ -229,12 +306,12 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}, cached_keys: {}",
object_info->getFileName(), bucket,
bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None",
current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None",
bucket_cache.keys.size());
if (bucket_holder_it->second)
if (current_bucket_holder)
{
if (bucket_holder_it->second->getBucket() != bucket)
if (current_bucket_holder->getBucket() != bucket)
{
/// Acquired bucket differs from object's bucket,
/// put it into bucket's cache and continue.
@ -242,13 +319,16 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
/// Bucket is already acquired, process the file.
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
else
{
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor);
if (bucket_holder_it->second)
auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (acquired_bucket)
{
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_cache.processor = current_processor;
if (!bucket_cache.keys.empty())
{
@ -258,7 +338,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
object_info = bucket_cache.keys.front();
bucket_cache.keys.pop_front();
}
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
else
{
@ -270,12 +350,6 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
}
else
{
if (bucket_holder_it->second)
{
bucket_holder_it->second->release();
bucket_holder_it->second.reset();
}
LOG_TEST(log, "Reached the end of file iterator");
iterator_finished = true;
@ -301,7 +375,12 @@ StorageS3QueueSource::StorageS3QueueSource(
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_)
LoggerPtr log_,
size_t max_processed_files_before_commit_,
size_t max_processed_rows_before_commit_,
size_t max_processed_bytes_before_commit_,
size_t max_processing_time_sec_before_commit_,
bool commit_once_processed_)
: ISource(header_)
, WithContext(context_)
, name(std::move(name_))
@ -314,6 +393,11 @@ StorageS3QueueSource::StorageS3QueueSource(
, table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_)
, storage_id(storage_id_)
, max_processed_files_before_commit(max_processed_files_before_commit_)
, max_processed_rows_before_commit(max_processed_rows_before_commit_)
, max_processed_bytes_before_commit(max_processed_bytes_before_commit_)
, max_processing_time_sec_before_commit(max_processing_time_sec_before_commit_)
, commit_once_processed(commit_once_processed_)
, remove_file_func(remove_file_func_)
, log(log_)
{
@ -329,24 +413,52 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
if (initialized)
return;
LOG_TEST(log, "Initializing a new reader");
internal_source->lazyInitialize(processor);
reader = std::move(internal_source->reader);
if (reader)
reader_future = std::move(internal_source->reader_future);
initialized = true;
}
Chunk StorageS3QueueSource::generate()
{
Chunk chunk;
try
{
chunk = generateImpl();
}
catch (...)
{
if (commit_once_processed)
commit(false, getCurrentExceptionMessage(true));
throw;
}
if (!chunk && commit_once_processed)
{
commit(true);
}
return chunk;
}
Chunk StorageS3QueueSource::generateImpl()
{
lazyInitialize(processor_id);
while (true)
{
if (!reader)
{
LOG_TEST(log, "No reader");
break;
}
const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(&reader.getObjectInfo());
auto file_metadata = object_info->processing_holder;
const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(reader.getObjectInfo().get());
auto file_metadata = object_info->file_metadata;
auto file_status = file_metadata->getFileStatus();
if (isCancelled())
@ -357,7 +469,7 @@ Chunk StorageS3QueueSource::generate()
{
try
{
file_metadata->setFailed("Cancelled");
file_metadata->setFailed("Cancelled", /* reduce_retry_count */true, /* overwrite_status */false);
}
catch (...)
{
@ -365,16 +477,19 @@ Chunk StorageS3QueueSource::generate()
object_info->relative_path, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getObjectInfo().getPath(), *file_status, processed_rows_from_file, false);
appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false);
}
LOG_TEST(log, "Query is cancelled");
break;
}
const auto & path = reader.getObjectInfo().getPath();
const auto & path = reader.getObjectInfo()->getPath();
if (shutdown_called)
{
LOG_TEST(log, "Shutdown called");
if (processed_rows_from_file == 0)
break;
@ -386,7 +501,7 @@ Chunk StorageS3QueueSource::generate()
try
{
file_metadata->setFailed("Table is dropped");
file_metadata->setFailed("Table is dropped", /* reduce_retry_count */true, /* overwrite_status */false);
}
catch (...)
{
@ -420,15 +535,16 @@ Chunk StorageS3QueueSource::generate()
file_status->processed_rows += chunk.getNumRows();
processed_rows_from_file += chunk.getNumRows();
total_processed_rows += chunk.getNumRows();
total_processed_bytes += chunk.bytes();
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = path,
.size = reader.getObjectInfo().metadata->size_bytes
.size = reader.getObjectInfo()->metadata->size_bytes
});
return chunk;
}
}
@ -437,22 +553,84 @@ Chunk StorageS3QueueSource::generate()
const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message);
file_metadata->setFailed(message);
failed_during_read_files.push_back(file_metadata);
file_status->onFailed(getCurrentExceptionMessage(true));
appendLogElement(path, *file_status, processed_rows_from_file, false);
if (processed_rows_from_file == 0)
{
auto * file_iterator = dynamic_cast<FileIterator *>(internal_source->file_iterator.get());
chassert(file_iterator);
if (file_status->retries < file_metadata->getMaxTries())
file_iterator->returnForRetry(reader.getObjectInfo());
/// If we did not process any rows from the failed file,
/// commit all previously processed files,
/// not to lose the work already done.
return {};
}
throw;
}
file_metadata->setProcessed();
applyActionAfterProcessing(reader.getObjectInfo().relative_path);
appendLogElement(path, *file_status, processed_rows_from_file, true);
file_status->setProcessingEndTime();
file_status.reset();
processed_rows_from_file = 0;
processed_files.push_back(file_metadata);
if (processed_files.size() == max_processed_files_before_commit)
{
LOG_TRACE(log, "Number of max processed files before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
break;
}
bool rows_or_bytes_or_time_limit_reached = false;
if (max_processed_rows_before_commit
&& total_processed_rows == max_processed_rows_before_commit)
{
LOG_TRACE(log, "Number of max processed rows before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
else if (max_processed_bytes_before_commit
&& total_processed_bytes == max_processed_bytes_before_commit)
{
LOG_TRACE(log, "Number of max processed bytes before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
else if (max_processing_time_sec_before_commit
&& total_stopwatch.elapsedSeconds() >= max_processing_time_sec_before_commit)
{
LOG_TRACE(log, "Max processing time before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
if (rows_or_bytes_or_time_limit_reached)
{
if (!reader_future.valid())
break;
LOG_TRACE(log, "Rows or bytes limit reached, but we have one more file scheduled already, "
"will process it despite the limit");
}
if (shutdown_called)
{
LOG_INFO(log, "Shutdown was called, stopping sync");
LOG_TRACE(log, "Shutdown was called, stopping sync");
break;
}
@ -460,19 +638,55 @@ Chunk StorageS3QueueSource::generate()
reader = reader_future.get();
if (!reader)
{
LOG_TEST(log, "Reader finished");
break;
}
file_status = files_metadata->getFileStatus(reader.getObjectInfo().getPath());
file_status = files_metadata->getFileStatus(reader.getObjectInfo()->getPath());
if (!rows_or_bytes_or_time_limit_reached && processed_files.size() + 1 < max_processed_files_before_commit)
{
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool->wait();
reader_future = internal_source->createReaderAsync(processor_id);
}
}
return {};
}
void StorageS3QueueSource::commit(bool success, const std::string & exception_message)
{
LOG_TEST(log, "Having {} files to set as {}, failed files: {}",
processed_files.size(), success ? "Processed" : "Failed", failed_during_read_files.size());
for (const auto & file_metadata : processed_files)
{
if (success)
{
file_metadata->setProcessed();
applyActionAfterProcessing(file_metadata->getPath());
}
else
file_metadata->setFailed(
exception_message,
/* reduce_retry_count */false,
/* overwrite_status */true);
}
for (const auto & file_metadata : failed_during_read_files)
{
/// `exception` from commit args is from insertion to storage.
/// Here we do not used it as failed_during_read_files were not inserted into storage, but skipped.
file_metadata->setFailed(
file_metadata->getFileStatus()->getException(),
/* reduce_retry_count */true,
/* overwrite_status */false);
}
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
{
switch (action)

View File

@ -20,24 +20,18 @@ class StorageS3QueueSource : public ISource, WithContext
{
public:
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
using GlobIterator = StorageObjectStorageSource::GlobIterator;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using Source = StorageObjectStorageSource;
using RemoveFileFunc = std::function<void(std::string)>;
using FileStatusPtr = S3QueueMetadata::FileStatusPtr;
using ReaderHolder = StorageObjectStorageSource::ReaderHolder;
using Metadata = S3QueueMetadata;
using ObjectInfo = StorageObjectStorageSource::ObjectInfo;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
using BucketHolderPtr = S3QueueOrderedFileMetadata::BucketHolderPtr;
using BucketHolder = S3QueueOrderedFileMetadata::BucketHolder;
struct S3QueueObjectInfo : public ObjectInfo
struct S3QueueObjectInfo : public Source::ObjectInfo
{
S3QueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_);
const Source::ObjectInfo & object_info,
S3QueueMetadata::FileMetadataPtr file_metadata_);
Metadata::FileMetadataPtr processing_holder;
S3QueueMetadata::FileMetadataPtr file_metadata;
};
class FileIterator : public StorageObjectStorageSource::IIterator
@ -45,39 +39,59 @@ public:
public:
FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
bool isFinished() const;
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
ObjectInfoPtr nextImpl(size_t processor) override;
Source::ObjectInfoPtr nextImpl(size_t processor) override;
size_t estimatedKeysCount() override;
/// If the key was taken from iterator via next() call,
/// we might later want to return it back for retrying.
void returnForRetry(Source::ObjectInfoPtr object_info);
/// Release hold buckets.
/// In fact, they could be released in destructors of BucketHolder,
/// but we anyway try to release them explicitly,
/// because we want to be able to rethrow exceptions if they might happen.
void releaseFinishedBuckets();
private:
using Bucket = S3QueueMetadata::Bucket;
using Processor = S3QueueMetadata::Processor;
const std::shared_ptr<S3QueueMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator;
const std::unique_ptr<Source::GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
std::mutex mutex;
LoggerPtr log;
std::mutex buckets_mutex;
struct ListedKeys
{
std::deque<ObjectInfoPtr> keys;
std::deque<Source::ObjectInfoPtr> keys;
std::optional<Processor> processor;
};
/// A cache of keys which were iterated via glob_iterator, but not taken for processing.
std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
bool iterator_finished = false;
std::unordered_map<size_t, S3QueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
/// We store a vector of holders, because we cannot release them until processed files are committed.
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders;
/// Is glob_iterator finished?
std::atomic_bool iterator_finished = false;
/// Only for processing without buckets.
std::deque<Source::ObjectInfoPtr> objects_to_retry;
std::pair<Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
bool hasKeysForProcessor(const Processor & processor) const;
};
StorageS3QueueSource(
@ -94,7 +108,12 @@ public:
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_);
LoggerPtr log_,
size_t max_processed_files_before_commit_,
size_t max_processed_rows_before_commit_,
size_t max_processed_bytes_before_commit_,
size_t max_processing_time_sec_before_commit_,
bool commit_once_processed_);
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
@ -102,6 +121,10 @@ public:
Chunk generate() override;
/// Commit files after insertion into storage finished.
/// `success` defines whether insertion was successful or not.
void commit(bool success, const std::string & exception_message = {});
private:
const String name;
const size_t processor_id;
@ -113,17 +136,29 @@ private:
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id;
const size_t max_processed_files_before_commit;
const size_t max_processed_rows_before_commit;
const size_t max_processed_bytes_before_commit;
const size_t max_processing_time_sec_before_commit;
const bool commit_once_processed;
RemoveFileFunc remove_file_func;
LoggerPtr log;
ReaderHolder reader;
std::future<ReaderHolder> reader_future;
std::vector<S3QueueMetadata::FileMetadataPtr> processed_files;
std::vector<S3QueueMetadata::FileMetadataPtr> failed_during_read_files;
Source::ReaderHolder reader;
std::future<Source::ReaderHolder> reader_future;
std::atomic<bool> initialized{false};
size_t processed_rows_from_file = 0;
size_t total_processed_rows = 0;
size_t total_processed_bytes = 0;
S3QueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
Chunk generateImpl();
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void lazyInitialize(size_t processor);

View File

@ -130,8 +130,11 @@ void S3QueueUnorderedFileMetadata::setProcessedImpl()
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
if (max_loading_retries)
zk_client->tryRemove(failed_node_path + ".retriable", -1);
if (max_loading_retries
&& zk_client->tryRemove(failed_node_path + ".retriable", -1) == Coordination::Error::ZOK)
{
LOG_TEST(log, "Removed node {}.retriable", failed_node_path);
}
LOG_TRACE(log, "Moved file `{}` to processed (node path: {})", path, processed_node_path);
return;

View File

@ -26,6 +26,7 @@
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem>
@ -71,7 +72,12 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach)
void checkAndAdjustSettings(
S3QueueSettings & s3queue_settings,
const Settings & settings,
bool is_attach,
const LoggerPtr & log,
ASTStorage * engine_args)
{
if (!is_attach && !s3queue_settings.mode.changed)
{
@ -79,11 +85,6 @@ namespace
}
/// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
{
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
@ -95,6 +96,21 @@ namespace
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
}
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!is_attach && !s3queue_settings.s3queue_processing_threads_num.changed)
{
s3queue_settings.s3queue_processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"s3queue_processing_threads_num",
s3queue_settings.s3queue_processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", s3queue_settings.s3queue_processing_threads_num);
}
}
}
@ -107,7 +123,7 @@ StorageS3Queue::StorageS3Queue(
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTStorage * /* engine_args */,
ASTStorage * engine_args,
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_)
@ -131,7 +147,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log, engine_args);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
@ -305,10 +321,12 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(storage->createSource(
i,
i/* processor_id */,
info,
iterator,
max_block_size, context));
max_block_size,
context,
true/* commit_once_processed */));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
@ -325,7 +343,8 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context)
ContextPtr local_context,
bool commit_once_processed)
{
auto internal_source = std::make_unique<StorageObjectStorageSource>(
getName(),
@ -358,7 +377,12 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
table_is_being_dropped,
s3_queue_log,
getStorageID(),
log);
log,
s3queue_settings->s3queue_max_processed_files_before_commit,
s3queue_settings->s3queue_max_processed_rows_before_commit,
s3queue_settings->s3queue_max_processed_bytes_before_commit,
s3queue_settings->s3queue_max_processing_time_sec_before_commit,
commit_once_processed);
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
@ -433,34 +457,51 @@ void StorageS3Queue::threadFunc()
bool StorageS3Queue::streamToViews()
{
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
auto file_iterator = createFileIterator(s3queue_context, nullptr);
size_t total_rows = 0;
while (!shutdown_called && !file_iterator->isFinished())
{
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(block_io.pipeline.getHeader().getNames(), storage_snapshot, supportsSubsetOfColumns(s3queue_context));
auto read_from_format_info = prepareReadingFromFormat(
block_io.pipeline.getHeader().getNames(),
storage_snapshot,
supportsSubsetOfColumns(s3queue_context));
Pipes pipes;
std::vector<std::shared_ptr<StorageS3QueueSource>> sources;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
sources.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(i, read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
pipes.emplace_back(std::move(source));
auto source = createSource(
i/* processor_id */,
read_from_format_info,
file_iterator,
DBMS_DEFAULT_BUFFER_SIZE,
s3queue_context,
false/* commit_once_processed */);
pipes.emplace_back(source);
sources.emplace_back(source);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -471,10 +512,28 @@ bool StorageS3Queue::streamToViews()
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
try
{
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
catch (...)
{
for (auto & source : sources)
source->commit(/* success */false, getCurrentExceptionMessage(true));
return rows > 0;
file_iterator->releaseFinishedBuckets();
throw;
}
for (auto & source : sources)
source->commit(/* success */true);
file_iterator->releaseFinishedBuckets();
total_rows += rows;
}
return total_rows > 0;
}
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const

View File

@ -88,7 +88,8 @@ private:
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context);
ContextPtr local_context,
bool commit_once_processed);
bool hasDependencies(const StorageID & table_id);
bool streamToViews();

View File

@ -1789,7 +1789,8 @@ public:
void onCancel() override
{
std::lock_guard cancel_lock(cancel_mutex);
finalize();
cancelBuffers();
releaseBuffers();
cancelled = true;
}
@ -1803,18 +1804,18 @@ public:
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization
release();
releaseBuffers();
}
}
void onFinish() override
{
std::lock_guard cancel_lock(cancel_mutex);
finalize();
finalizeBuffers();
}
private:
void finalize()
void finalizeBuffers()
{
if (!writer)
return;
@ -1827,19 +1828,27 @@ private:
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
release();
releaseBuffers();
throw;
}
write_buf->finalize();
}
void release()
void releaseBuffers()
{
writer.reset();
write_buf.reset();
}
void cancelBuffers()
{
if (writer)
writer->cancel();
if (write_buf)
write_buf->cancel();
}
StorageMetadataPtr metadata_snapshot;
String table_name_for_log;

View File

@ -322,6 +322,10 @@ public:
/// Rollback partial writes.
/// No more writing.
for (auto & [_, stream] : streams)
{
stream.cancel();
}
streams.clear();
/// Truncate files to the older sizes.
@ -373,6 +377,12 @@ private:
plain->next();
plain->finalize();
}
void cancel()
{
compressed.cancel();
plain->cancel();
}
};
using FileStreams = std::map<String, Stream>;

View File

@ -97,8 +97,7 @@ void SetOrJoinSink::onFinish()
if (persistent)
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf->next();
compressed_backup_buf.finalize();
backup_buf->finalize();
table.disk->replaceFile(fs::path(backup_tmp_path) / backup_file_name, fs::path(backup_path) / backup_file_name);

View File

@ -207,7 +207,10 @@ public:
/// Rollback partial writes.
/// No more writing.
data_out->cancel();
data_out.reset();
data_out_compressed->cancel();
data_out_compressed.reset();
/// Truncate files to the older sizes.
@ -233,8 +236,7 @@ public:
if (done)
return;
data_out->next();
data_out_compressed->next();
data_out->finalize();
data_out_compressed->finalize();
/// Save the new indices.
@ -494,8 +496,7 @@ void StorageStripeLog::saveIndices(const WriteLock & /* already locked for writi
for (size_t i = start; i != num_indices; ++i)
indices.blocks[i].write(*index_out);
index_out->next();
index_out_compressed->next();
index_out->finalize();
index_out_compressed->finalize();
num_indices_saved = num_indices;

View File

@ -576,31 +576,25 @@ void StorageURLSink::consume(Chunk chunk)
void StorageURLSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelBuffers();
releaseBuffers();
cancelled = true;
}
void StorageURLSink::onException(std::exception_ptr exception)
void StorageURLSink::onException(std::exception_ptr)
{
std::lock_guard lock(cancel_mutex);
try
{
std::rethrow_exception(exception);
}
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization
release();
}
cancelBuffers();
releaseBuffers();
}
void StorageURLSink::onFinish()
{
std::lock_guard lock(cancel_mutex);
finalize();
finalizeBuffers();
}
void StorageURLSink::finalize()
void StorageURLSink::finalizeBuffers()
{
if (!writer)
return;
@ -613,19 +607,27 @@ void StorageURLSink::finalize()
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
release();
releaseBuffers();
throw;
}
write_buf->finalize();
}
void StorageURLSink::release()
void StorageURLSink::releaseBuffers()
{
writer.reset();
write_buf.reset();
}
void StorageURLSink::cancelBuffers()
{
if (writer)
writer->cancel();
if (write_buf)
write_buf->cancel();
}
class PartitionedStorageURLSink : public PartitionedSink
{
public:

View File

@ -257,8 +257,10 @@ public:
void onFinish() override;
private:
void finalize();
void release();
void finalizeBuffers();
void releaseBuffers();
void cancelBuffers();
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
std::mutex cancel_mutex;

View File

@ -26,6 +26,7 @@ ColumnsDescription StorageSystemS3Queue::getColumnsDescription()
return ColumnsDescription
{
{"zookeeper_path", std::make_shared<DataTypeString>(), "Path in zookeeper to S3Queue metadata"},
{"file_path", std::make_shared<DataTypeString>(), "File path of a file which is being processed by S3Queue"},
{"file_name", std::make_shared<DataTypeString>(), "File name of a file which is being processed by S3Queue"},
{"rows_processed", std::make_shared<DataTypeUInt64>(), "Currently processed number of rows"},
{"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"},
@ -45,11 +46,12 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co
{
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
{
for (const auto & [file_name, file_status] : metadata->getFileStatuses())
for (const auto & [file_path, file_status] : metadata->getFileStatuses())
{
size_t i = 0;
res_columns[i++]->insert(zookeeper_path);
res_columns[i++]->insert(file_name);
res_columns[i++]->insert(file_path);
res_columns[i++]->insert(std::filesystem::path(file_path).filename().string());
res_columns[i++]->insert(file_status->processed_rows.load());
res_columns[i++]->insert(magic_enum::enum_name(file_status->state.load()));

View File

@ -1068,9 +1068,10 @@ void StorageWindowView::threadFuncFireProc()
if (max_watermark >= timestamp_now)
clean_cache_task->schedule();
UInt64 next_fire_ms = static_cast<UInt64>(next_fire_signal) * 1000;
UInt64 timestamp_ms = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds()) / 1000;
if (!shutdown_called)
fire_task->scheduleAfter(std::max(UInt64(0), static_cast<UInt64>(next_fire_signal) * 1000 - timestamp_ms));
fire_task->scheduleAfter(next_fire_ms - std::min(next_fire_ms, timestamp_ms));
}
void StorageWindowView::threadFuncFireEvent()

View File

@ -64,11 +64,14 @@ def main():
+ ci_config["jobs_data"]["jobs_to_do"]
)
builds_for_check = [job for job in CI.BuildNames if job in all_ci_jobs]
print(f"NOTE: following build reports will be checked: [{builds_for_check}]")
print("NOTE: builds for check taken from ci configuration")
else:
builds_for_check = parse_args().reports
for job in builds_for_check:
assert job in CI.BuildNames, "Builds must be known build job names"
print("NOTE: builds for check taken from input arguments")
print(f"NOTE: following build reports will be checked: [{builds_for_check}]")
required_builds = len(builds_for_check)
missing_builds = 0

View File

@ -185,8 +185,7 @@ class JobNames(metaclass=WithIter):
LIBFUZZER_TEST = "libFuzzer tests"
BUILD_CHECK = "ClickHouse build check"
# BUILD_CHECK_SPECIAL = "ClickHouse special build check"
BUILD_CHECK = "Builds"
DOCS_CHECK = "Docs check"
BUGFIX_VALIDATE = "Bugfix validation"
@ -214,8 +213,12 @@ class StatusNames(metaclass=WithIter):
class SyncState(metaclass=WithIter):
PENDING = "awaiting merge"
MERGE_FAILED = "merge failed"
PENDING = "awaiting sync"
# temporary state if GH does not know mergeable state
MERGE_UNKNOWN = "unknown state (might be auto recoverable)"
# changes cannot be pushed/merged to a sync branch
PUSH_FAILED = "push failed"
MERGE_CONFLICTS = "merge conflicts"
TESTING = "awaiting test results"
TESTS_FAILED = "tests failed"
COMPLETED = "completed"
@ -331,7 +334,7 @@ class CommonJobConfigs:
"""
BUILD_REPORT = JobConfig(
job_name_keyword="build_check",
job_name_keyword="builds",
run_command="build_report_check.py",
digest=DigestConfig(
include_paths=[
@ -638,7 +641,7 @@ CHECK_DESCRIPTIONS = [
lambda x: x == "CI running",
),
CheckDescription(
"ClickHouse build check",
"Builds",
"Builds ClickHouse in various configurations for use in further steps. "
"You have to fix the builds that fail. Build logs often has enough "
"information to fix the error, but you might have to reproduce the failure "

View File

@ -18,8 +18,7 @@ from github.IssueComment import IssueComment
from github.Repository import Repository
from ci_config import CI
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY, TEMP_PATH
from lambda_shared_package.lambda_shared.pr import Labels
from env_helper import GITHUB_REPOSITORY, TEMP_PATH
from pr_info import PRInfo
from report import (
ERROR,
@ -29,7 +28,6 @@ from report import (
StatusType,
TestResult,
TestResults,
get_status,
get_worst_status,
)
from s3_helper import S3Helper
@ -103,7 +101,12 @@ def post_commit_status(
if i == RETRY - 1:
raise ex
time.sleep(i)
if pr_info:
if pr_info and check_name not in (
CI.StatusNames.MERGEABLE,
CI.StatusNames.CI,
CI.StatusNames.PR_CHECK,
CI.StatusNames.SYNC,
):
status_updated = False
for i in range(RETRY):
try:
@ -157,6 +160,17 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
gh.__requester = commit._requester # type:ignore #pylint:disable=protected-access
repo = get_repo(gh)
statuses = sorted(get_commit_filtered_statuses(commit), key=lambda x: x.context)
statuses = [
status
for status in statuses
if status.context
not in (
CI.StatusNames.MERGEABLE,
CI.StatusNames.CI,
CI.StatusNames.PR_CHECK,
CI.StatusNames.SYNC,
)
]
if not statuses:
return
@ -439,29 +453,10 @@ def set_mergeable_check(
)
def update_mergeable_check(commit: Commit, pr_info: PRInfo, check_name: str) -> None:
"check if the check_name in REQUIRED_CHECKS and then trigger update"
not_run = (
pr_info.labels.intersection({Labels.SKIP_MERGEABLE_CHECK, Labels.RELEASE})
or not CI.is_required(check_name)
or pr_info.release_pr
or pr_info.number == 0
)
if not_run:
# Let's avoid unnecessary work
return
logging.info("Update Mergeable Check by %s", check_name)
statuses = get_commit_filtered_statuses(commit)
trigger_mergeable_check(commit, statuses)
def trigger_mergeable_check(
commit: Commit,
statuses: CommitStatuses,
set_if_green: bool = False,
set_from_sync: bool = False,
workflow_failed: bool = False,
) -> StatusType:
"""calculate and update StatusNames.MERGEABLE"""
@ -501,63 +496,43 @@ def trigger_mergeable_check(
description = format_description(description)
if not set_if_green and state == SUCCESS:
# do not set green Mergeable Check status
pass
else:
if mergeable_status is None or mergeable_status.description != description:
if set_from_sync:
# update Mergeable Check from sync WF only if its status already present or its new status is not SUCCESS
# to avoid false-positives
if mergeable_status or state != SUCCESS:
set_mergeable_check(commit, description, state)
elif mergeable_status is None or mergeable_status.description != description:
set_mergeable_check(commit, description, state)
return state
def update_upstream_sync_status(
upstream_pr_number: int,
sync_pr_number: int,
gh: Github,
pr_info: PRInfo,
state: StatusType,
can_set_green_mergeable_status: bool = False,
) -> None:
upstream_repo = gh.get_repo(GITHUB_UPSTREAM_REPOSITORY)
upstream_pr = upstream_repo.get_pull(upstream_pr_number)
sync_repo = gh.get_repo(GITHUB_REPOSITORY)
sync_pr = sync_repo.get_pull(sync_pr_number)
# Find the commit that is in both repos, upstream and cloud
sync_commits = sync_pr.get_commits().reversed
upstream_commits = upstream_pr.get_commits().reversed
# Github objects are compared by _url attribute. We can't compare them directly and
# should compare commits by SHA1
upstream_shas = [c.sha for c in upstream_commits]
logging.info("Commits in upstream PR:\n %s", ", ".join(upstream_shas))
sync_shas = [c.sha for c in sync_commits]
logging.info("Commits in sync PR:\n %s", ", ".join(reversed(sync_shas)))
last_synced_upstream_commit = pr_info.get_latest_sync_commit()
# find latest synced commit
last_synced_upstream_commit = None
for commit in upstream_commits:
if commit.sha in sync_shas:
last_synced_upstream_commit = commit
break
assert last_synced_upstream_commit
sync_status = get_status(state)
logging.info(
"Using commit %s to post the %s status `%s`: [%s]",
"Using commit [%s] to post the [%s] status [%s]",
last_synced_upstream_commit.sha,
sync_status,
state,
CI.StatusNames.SYNC,
"",
)
if state == SUCCESS:
description = CI.SyncState.COMPLETED
else:
description = CI.SyncState.TESTS_FAILED
post_commit_status(
last_synced_upstream_commit,
sync_status,
"",
state,
"",
description,
CI.StatusNames.SYNC,
)
trigger_mergeable_check(
last_synced_upstream_commit,
get_commit_filtered_statuses(last_synced_upstream_commit),
set_if_green=can_set_green_mergeable_status,
set_from_sync=True,
)

View File

@ -9,15 +9,10 @@ from commit_status_helper import (
get_commit,
get_commit_filtered_statuses,
post_commit_status,
set_mergeable_check,
trigger_mergeable_check,
update_upstream_sync_status,
)
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
from report import FAILURE, PENDING, SUCCESS, StatusType
from synchronizer_utils import SYNC_BRANCH_PREFIX
def parse_args() -> argparse.Namespace:
@ -45,31 +40,7 @@ def main():
gh = Github(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha)
if pr_info.is_merge_queue:
# in MQ Mergeable check status must never be green if any failures in the workflow
if has_workflow_failures:
set_mergeable_check(commit, "workflow failed", FAILURE)
else:
# This must be the only place where green MCheck is set in the MQ (in the end of CI) to avoid early merge
set_mergeable_check(commit, "workflow passed", SUCCESS)
return
statuses = get_commit_filtered_statuses(commit)
state = trigger_mergeable_check(commit, statuses, set_if_green=True)
# Process upstream StatusNames.SYNC
if (
pr_info.head_ref.startswith(f"{SYNC_BRANCH_PREFIX}/pr/")
and GITHUB_REPOSITORY != GITHUB_UPSTREAM_REPOSITORY
):
upstream_pr_number = int(pr_info.head_ref.split("/pr/", maxsplit=1)[1])
update_upstream_sync_status(
upstream_pr_number,
pr_info.number,
gh,
state,
can_set_green_mergeable_status=True,
)
ci_running_statuses = [s for s in statuses if s.context == CI.StatusNames.CI]
if not ci_running_statuses:

View File

@ -4,6 +4,7 @@
import argparse
import logging
import sys
from datetime import datetime
from os import getenv
from pprint import pformat
@ -17,11 +18,14 @@ from commit_status_helper import (
get_commit_filtered_statuses,
get_commit,
trigger_mergeable_check,
update_upstream_sync_status,
)
from get_robot_token import get_best_robot_token
from github_helper import GitHub, NamedUser, PullRequest, Repository
from pr_info import PRInfo
from report import SUCCESS
from report import SUCCESS, FAILURE
from env_helper import GITHUB_UPSTREAM_REPOSITORY, GITHUB_REPOSITORY
from synchronizer_utils import SYNC_BRANCH_PREFIX
# The team name for accepted approvals
TEAM_NAME = getenv("GITHUB_TEAM_NAME", "core")
@ -243,17 +247,29 @@ def main():
repo = gh.get_repo(args.repo)
if args.set_ci_status:
assert args.wf_status in ("failure", "success")
assert args.wf_status in (FAILURE, SUCCESS)
# set mergeable check status and exit
commit = get_commit(gh, args.pr_info.sha)
statuses = get_commit_filtered_statuses(commit)
trigger_mergeable_check(
state = trigger_mergeable_check(
commit,
statuses,
set_if_green=True,
workflow_failed=(args.wf_status != "success"),
)
return
# Process upstream StatusNames.SYNC
pr_info = PRInfo()
if (
pr_info.head_ref.startswith(f"{SYNC_BRANCH_PREFIX}/pr/")
and GITHUB_REPOSITORY != GITHUB_UPSTREAM_REPOSITORY
):
print("Updating upstream statuses")
update_upstream_sync_status(pr_info, state)
if args.wf_status != "success":
# exit with 1 to rerun on workflow failed job restart
sys.exit(1)
sys.exit(0)
# An ugly and not nice fix to patch the wrong organization URL,
# see https://github.com/PyGithub/PyGithub/issues/2395#issuecomment-1378629710

View File

@ -13,8 +13,11 @@ from env_helper import (
GITHUB_REPOSITORY,
GITHUB_RUN_URL,
GITHUB_SERVER_URL,
GITHUB_UPSTREAM_REPOSITORY,
)
from lambda_shared_package.lambda_shared.pr import Labels
from get_robot_token import get_best_robot_token
from github_helper import GitHub
NeedsDataType = Dict[str, Dict[str, Union[str, Dict[str, str]]]]
@ -432,6 +435,34 @@ class PRInfo:
return True
return False
def get_latest_sync_commit(self):
gh = GitHub(get_best_robot_token(), per_page=100)
assert self.head_ref.startswith("sync-upstream/pr/")
assert self.repo_full_name != GITHUB_UPSTREAM_REPOSITORY
upstream_repo = gh.get_repo(GITHUB_UPSTREAM_REPOSITORY)
upstream_pr_number = int(self.head_ref.split("/pr/", maxsplit=1)[1])
upstream_pr = upstream_repo.get_pull(upstream_pr_number)
sync_repo = gh.get_repo(GITHUB_REPOSITORY)
sync_pr = sync_repo.get_pull(self.number)
# Find the commit that is in both repos, upstream and cloud
sync_commits = sync_pr.get_commits().reversed
upstream_commits = upstream_pr.get_commits().reversed
# Github objects are compared by _url attribute. We can't compare them directly and
# should compare commits by SHA1
upstream_shas = [c.sha for c in upstream_commits]
logging.info("Commits in upstream PR:\n %s", ", ".join(upstream_shas))
sync_shas = [c.sha for c in sync_commits]
logging.info("Commits in sync PR:\n %s", ", ".join(reversed(sync_shas)))
# find latest synced commit
last_synced_upstream_commit = None
for commit in upstream_commits:
if commit.sha in sync_shas:
last_synced_upstream_commit = commit
break
assert last_synced_upstream_commit
return last_synced_upstream_commit
class FakePRInfo:
def __init__(self):

View File

@ -104,7 +104,7 @@ class S3Helper:
self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata)
url = self.s3_url(bucket_name, s3_path)
logging.info("Upload %s to %s. Meta: %s", file_path, url, metadata)
logging.info("Upload %s to %s Meta: %s", file_path, url, metadata)
return url
def delete_file_from_s3(self, bucket_name: str, s3_path: str) -> None:

Some files were not shown because too many files have changed in this diff Show More