Merge branch 'master' into sensitive_data_masker_unittest_issue

This commit is contained in:
Suzy Wang 2022-09-09 22:59:50 -04:00 committed by GitHub
commit d60340eb40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 1250 additions and 433 deletions

View File

@ -349,6 +349,100 @@ jobs:
# shellcheck disable=SC2046 # shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||: docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH" sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwin:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_darwin
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwinAarch64:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_darwin_aarch64
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################ ############################################################################################
##################################### Docker images ####################################### ##################################### Docker images #######################################
############################################################################################ ############################################################################################
@ -425,6 +519,46 @@ jobs:
# shellcheck disable=SC2046 # shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||: docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" sudo rm -fr "$TEMP_PATH"
BuilderSpecialReport:
needs:
- BuilderBinDarwin
- BuilderBinDarwinAarch64
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/report_check
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=ClickHouse special build check
NEEDS_DATA_PATH=${{runner.temp}}/needs.json
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Report Builder
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cat > "$NEEDS_DATA_PATH" << 'EOF'
${{ toJSON(needs) }}
EOF
cd "$GITHUB_WORKSPACE/tests/ci"
python3 build_report_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH"
############################################################################################## ##############################################################################################
########################### FUNCTIONAl STATELESS TESTS ####################################### ########################### FUNCTIONAl STATELESS TESTS #######################################
############################################################################################## ##############################################################################################
@ -592,6 +726,7 @@ jobs:
- DockerHubPush - DockerHubPush
- DockerServerImages - DockerServerImages
- BuilderReport - BuilderReport
- BuilderSpecialReport
- FunctionalStatelessTestAsan - FunctionalStatelessTestAsan
- FunctionalStatefulTestDebug - FunctionalStatefulTestDebug
- StressTestTsan - StressTestTsan

View File

@ -29,8 +29,12 @@ jobs:
rm -rf "$TEMP_PATH" && mkdir -p "$TEMP_PATH" rm -rf "$TEMP_PATH" && mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH" cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY" cd "$REPO_COPY"
# Download and push packages to artifactory
python3 ./tests/ci/push_to_artifactory.py --release "${{ github.ref }}" \ python3 ./tests/ci/push_to_artifactory.py --release "${{ github.ref }}" \
--commit '${{ github.sha }}' --artifactory-url "${{ secrets.JFROG_ARTIFACTORY_URL }}" --all --commit '${{ github.sha }}' --artifactory-url "${{ secrets.JFROG_ARTIFACTORY_URL }}" --all
# Download macos binaries to ${{runner.temp}}/download_binary
python3 ./tests/ci/download_binary.py binary_darwin binary_darwin_aarch64
mv '${{runner.temp}}/download_binary/'clickhouse-* '${{runner.temp}}/push_to_artifactory'
- name: Upload packages to release assets - name: Upload packages to release assets
uses: svenstaro/upload-release-action@v2 uses: svenstaro/upload-release-action@v2
with: with:

View File

@ -426,6 +426,100 @@ jobs:
# shellcheck disable=SC2046 # shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||: docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH" sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwin:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_darwin
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwinAarch64:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_darwin_aarch64
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################ ############################################################################################
##################################### Docker images ####################################### ##################################### Docker images #######################################
############################################################################################ ############################################################################################
@ -505,6 +599,46 @@ jobs:
# shellcheck disable=SC2046 # shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||: docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" sudo rm -fr "$TEMP_PATH"
BuilderSpecialReport:
needs:
- BuilderBinDarwin
- BuilderBinDarwinAarch64
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/report_check
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=ClickHouse special build check
NEEDS_DATA_PATH=${{runner.temp}}/needs.json
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Report Builder
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cat > "$NEEDS_DATA_PATH" << 'EOF'
${{ toJSON(needs) }}
EOF
cd "$GITHUB_WORKSPACE/tests/ci"
python3 build_report_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
# shellcheck disable=SC2046
docker kill $(docker ps -q) ||:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH"
############################################################################################## ##############################################################################################
########################### FUNCTIONAl STATELESS TESTS ####################################### ########################### FUNCTIONAl STATELESS TESTS #######################################
############################################################################################## ##############################################################################################
@ -1847,6 +1981,7 @@ jobs:
- DockerHubPush - DockerHubPush
- DockerServerImages - DockerServerImages
- BuilderReport - BuilderReport
- BuilderSpecialReport
- FunctionalStatelessTestDebug0 - FunctionalStatelessTestDebug0
- FunctionalStatelessTestDebug1 - FunctionalStatelessTestDebug1
- FunctionalStatelessTestDebug2 - FunctionalStatelessTestDebug2

View File

@ -1,8 +1,15 @@
#!/bin/bash #!/bin/bash
# shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031 # shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031
set -eux set -x
# core.COMM.PID-TID
sysctl kernel.core_pattern='core.%e.%p-%P'
set -e
set -u
set -o pipefail set -o pipefail
trap "exit" INT TERM trap "exit" INT TERM
# The watchdog is in the separate process group, so we have to kill it separately # The watchdog is in the separate process group, so we have to kill it separately
# if the script terminates earlier. # if the script terminates earlier.
@ -87,6 +94,19 @@ function configure
# TODO figure out which ones are needed # TODO figure out which ones are needed
cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d
cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d
cat > db/config.d/core.xml <<EOL
<clickhouse>
<core_dump>
<!-- 100GiB -->
<size_limit>107374182400</size_limit>
</core_dump>
<!-- NOTE: no need to configure core_path,
since clickhouse is not started as daemon (via clickhouse start)
-->
<core_path>$PWD</core_path>
</clickhouse>
EOL
} }
function watchdog function watchdog
@ -180,7 +200,6 @@ handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass handle SIG$RTMIN nostop noprint pass
info signals info signals
continue continue
gcore
backtrace full backtrace full
thread apply all backtrace full thread apply all backtrace full
info registers info registers

View File

@ -8,6 +8,9 @@ dmesg --clear
set -x set -x
# core.COMM.PID-TID
sysctl kernel.core_pattern='core.%e.%p-%P'
# Thread Fuzzer allows to check more permutations of possible thread scheduling # Thread Fuzzer allows to check more permutations of possible thread scheduling
# and find more potential issues. # and find more potential issues.
@ -104,6 +107,19 @@ EOL
</default> </default>
</profiles> </profiles>
</clickhouse> </clickhouse>
EOL
cat > /etc/clickhouse-server/config.d/core.xml <<EOL
<clickhouse>
<core_dump>
<!-- 100GiB -->
<size_limit>107374182400</size_limit>
</core_dump>
<!-- NOTE: no need to configure core_path,
since clickhouse is not started as daemon (via clickhouse start)
-->
<core_path>$PWD</core_path>
</clickhouse>
EOL EOL
} }
@ -160,7 +176,6 @@ handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass handle SIG$RTMIN nostop noprint pass
info signals info signals
continue continue
gcore
backtrace full backtrace full
thread apply all backtrace full thread apply all backtrace full
info registers info registers
@ -504,8 +519,7 @@ done
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv [ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
# Core dumps (see gcore) # Core dumps
# Default filename is 'core.PROCESS_ID'
for core in core.*; do for core in core.*; do
pigz $core pigz $core
mv $core.gz /test_output/ mv $core.gz /test_output/

View File

@ -4,10 +4,9 @@ sidebar_position: 1
keywords: [clickhouse, install, installation, docs] keywords: [clickhouse, install, installation, docs]
description: ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture. description: ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture.
slug: /en/getting-started/install slug: /en/getting-started/install
title: Installation
--- ---
# Installation
## System Requirements {#system-requirements} ## System Requirements {#system-requirements}
ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture. ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture.

View File

@ -12,8 +12,9 @@ ALTER TABLE [db.]table [ON CLUSTER cluster] DELETE WHERE filter_expr
Deletes data matching the specified filtering expression. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations). Deletes data matching the specified filtering expression. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations).
:::note
The `ALTER TABLE` prefix makes this syntax different from most other systems supporting SQL. It is intended to signify that unlike similar queries in OLTP databases this is a heavy operation not designed for frequent use. :::note
The `ALTER TABLE` prefix makes this syntax different from most other systems supporting SQL. It is intended to signify that unlike similar queries in OLTP databases this is a heavy operation not designed for frequent use. `ALTER TABLE` is considered a heavyweight operation that requires the underlying data to be merged before it is deleted. For MergeTree tables, consider using the [`DELETE FROM` query](../delete.md), which performs a lightweight delete and can be considerably faster.
::: :::
The `filter_expr` must be of type `UInt8`. The query deletes rows in the table for which this expression takes a non-zero value. The `filter_expr` must be of type `UInt8`. The query deletes rows in the table for which this expression takes a non-zero value.

View File

@ -0,0 +1,37 @@
---
slug: /en/sql-reference/statements/delete
sidebar_position: 36
sidebar_label: DELETE
---
# DELETE Statement
``` sql
DELETE FROM [db.]table [WHERE expr]
```
`DELETE FROM` removes rows from table `[db.]table` that match expression `expr`. The deleted rows are marked as deleted immediately and will be automatically filtered out of all subsequent queries. Cleanup of data happens asynchronously in background. This feature is only available for MergeTree table engine family.
For example, the following query deletes all rows from the `hits` table where the `Title` column contains the text `hello`:
```sql
DELETE FROM hits WHERE Title LIKE '%hello%';
```
Lightweight deletes are asynchronous by default. Set `mutations_sync` equal to 1 to wait for one replica to process the statement, and set `mutations_sync` to 2 to wait for all replicas.
:::note
This feature is experimental and requires you to set `allow_experimental_lightweight_delete` to true:
```sql
SET allow_experimental_lightweight_delete = true;
```
:::
An [alternative way to delete rows](./alter/delete.md) in ClickHouse is `ALTER TABLE ... DELETE`, which might be more efficient if you do bulk deletes only occasionally and don't need the operation to be applied instantly. In most use cases the new lightweight `DELETE FROM` behavior will be considerably faster.
:::warning
Even though deletes are becoming more lightweight in ClickHouse, they should still not be used as aggressively as on OLTP system. Ligthweight deletes are currently efficient for wide parts, but for compact parts they can be a heavyweight operation, and it may be better to use `ALTER TABLE` for some scenarios.
:::

View File

@ -10,7 +10,7 @@ Makes the server "forget" about the existence of a table, a materialized view, o
**Syntax** **Syntax**
``` sql ``` sql
DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
``` ```
Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall. Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall.
@ -24,6 +24,8 @@ Note that you can not detach permanently the table which is already detached (te
Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query. Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query.
The `SYNC` modifier executes the action without delay.
**Example** **Example**
Creating a table: Creating a table:

View File

@ -6,7 +6,7 @@ sidebar_label: DROP
# DROP Statements # DROP Statements
Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. If the `SYNC` modifier is specified, the entity is dropped without delay.
## DROP DATABASE ## DROP DATABASE
@ -15,7 +15,7 @@ Deletes all tables inside the `db` database, then deletes the `db` database itse
Syntax: Syntax:
``` sql ``` sql
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC]
``` ```
## DROP TABLE ## DROP TABLE
@ -25,7 +25,7 @@ Deletes the table.
Syntax: Syntax:
``` sql ``` sql
DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
``` ```
## DROP DICTIONARY ## DROP DICTIONARY
@ -35,7 +35,7 @@ Deletes the dictionary.
Syntax: Syntax:
``` sql ``` sql
DROP DICTIONARY [IF EXISTS] [db.]name DROP DICTIONARY [IF EXISTS] [db.]name [SYNC]
``` ```
## DROP USER ## DROP USER
@ -95,7 +95,7 @@ Deletes a view. Views can be deleted by a `DROP TABLE` command as well but `DROP
Syntax: Syntax:
``` sql ``` sql
DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
``` ```
## DROP FUNCTION ## DROP FUNCTION

View File

@ -1,5 +1,5 @@
--- ---
slug: /en/development/tests slug: /zh/development/tests
sidebar_position: 70 sidebar_position: 70
sidebar_label: Testing sidebar_label: Testing
title: ClickHouse Testing title: ClickHouse Testing

View File

@ -0,0 +1,37 @@
#pragma once
#include <Backups/IBackupEntry.h>
namespace DB
{
/// Wraps another backup entry and a value of any type.
template <typename T>
class BackupEntryWrappedWith : public IBackupEntry
{
public:
BackupEntryWrappedWith(BackupEntryPtr entry_, const T & custom_value_) : entry(entry_), custom_value(custom_value_) { }
BackupEntryWrappedWith(BackupEntryPtr entry_, T && custom_value_) : entry(entry_), custom_value(std::move(custom_value_)) { }
~BackupEntryWrappedWith() override = default;
UInt64 getSize() const override { return entry->getSize(); }
std::optional<UInt128> getChecksum() const override { return entry->getChecksum(); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return entry->getReadBuffer(); }
String getFilePath() const override { return entry->getFilePath(); }
DiskPtr tryGetDiskIfExists() const override { return entry->tryGetDiskIfExists(); }
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
private:
BackupEntryPtr entry;
T custom_value;
};
template <typename T>
void wrapBackupEntriesWith(std::vector<std::pair<String, BackupEntryPtr>> & backup_entries, const T & custom_value)
{
for (auto & [_, backup_entry] : backup_entries)
backup_entry = std::make_shared<BackupEntryWrappedWith<T>>(std::move(backup_entry), custom_value);
}
}

View File

@ -50,7 +50,7 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr &&
if (!offsets_concrete) if (!offsets_concrete)
throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR); throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR);
if (!offsets_concrete->empty() && data) if (!offsets_concrete->empty() && data && !data->empty())
{ {
Offset last_offset = offsets_concrete->getData().back(); Offset last_offset = offsets_concrete->getData().back();

View File

@ -898,4 +898,25 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this); registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
} }
PathMatchResult matchPath(std::string_view path, std::string_view match_to)
{
using enum PathMatchResult;
if (path.ends_with('/'))
path.remove_suffix(1);
if (match_to.ends_with('/'))
match_to.remove_suffix(1);
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), match_to.begin(), match_to.end());
if (second_it != match_to.end())
return NOT_MATCH;
if (first_it == path.end())
return EXACT;
return *first_it == '/' ? IS_CHILD : NOT_MATCH;
}
} }

View File

@ -554,4 +554,13 @@ private:
ZooKeeperRequestFactory(); ZooKeeperRequestFactory();
}; };
enum class PathMatchResult
{
NOT_MATCH,
EXACT,
IS_CHILD
};
PathMatchResult matchPath(std::string_view path, std::string_view match_to);
} }

View File

@ -0,0 +1,15 @@
#include <gtest/gtest.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
TEST(ZooKeeperTest, TestMatchPath)
{
using namespace Coordination;
ASSERT_EQ(matchPath("/path/file", "/path"), PathMatchResult::IS_CHILD);
ASSERT_EQ(matchPath("/path/file", "/path/"), PathMatchResult::IS_CHILD);
ASSERT_EQ(matchPath("/path/file", "/"), PathMatchResult::IS_CHILD);
ASSERT_EQ(matchPath("/", "/"), PathMatchResult::EXACT);
ASSERT_EQ(matchPath("/path", "/path/"), PathMatchResult::EXACT);
ASSERT_EQ(matchPath("/path/", "/path"), PathMatchResult::EXACT);
}

View File

@ -116,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
} }
}; };
ISerialization::SubstreamPath path; auto serialization = column_type->getDefaultSerialization();
column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type); serialization->enumerateStreams(callback, column_type);
if (!result_codec) if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());

View File

@ -13,8 +13,10 @@
#include <filesystem> #include <filesystem>
#include <memory> #include <memory>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include "Coordination/KeeperContext.h" #include <Coordination/KeeperContext.h>
#include <Coordination/KeeperConstants.h> #include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
namespace DB namespace DB
{ {
@ -146,33 +148,6 @@ namespace
} }
} }
namespace
{
enum class PathMatchResult
{
NOT_MATCH,
EXACT,
IS_CHILD
};
PathMatchResult matchPath(const std::string_view path, const std::string_view match_to)
{
using enum PathMatchResult;
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), match_to.begin(), match_to.end());
if (second_it != match_to.end())
return NOT_MATCH;
if (first_it == path.end())
return EXACT;
return *first_it == '/' ? IS_CHILD : NOT_MATCH;
}
}
void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context) void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context)
{ {
writeBinary(static_cast<uint8_t>(snapshot.version), out); writeBinary(static_cast<uint8_t>(snapshot.version), out);
@ -217,7 +192,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
const auto & path = it->key; const auto & path = it->key;
// write only the root system path because of digest // write only the root system path because of digest
if (matchPath(path.toView(), keeper_system_path) == PathMatchResult::IS_CHILD) if (Coordination::matchPath(path.toView(), keeper_system_path) == Coordination::PathMatchResult::IS_CHILD)
{ {
++it; ++it;
continue; continue;
@ -365,8 +340,8 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
KeeperStorage::Node node{}; KeeperStorage::Node node{};
readNode(node, in, current_version, storage.acl_map); readNode(node, in, current_version, storage.acl_map);
using enum PathMatchResult; using enum Coordination::PathMatchResult;
auto match_result = matchPath(path, keeper_system_path); auto match_result = Coordination::matchPath(path, keeper_system_path);
const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path); const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
if (match_result == IS_CHILD) if (match_result == IS_CHILD)

View File

@ -879,7 +879,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
path_created += seq_num_str.str(); path_created += seq_num_str.str();
} }
if (path_created.starts_with(keeper_system_path)) if (Coordination::matchPath(path_created, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
{ {
auto error_msg = fmt::format("Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created); auto error_msg = fmt::format("Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created);
@ -1049,7 +1049,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta> new_deltas; std::vector<KeeperStorage::Delta> new_deltas;
if (request.path.starts_with(keeper_system_path)) if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
{ {
auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path); auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
@ -1203,7 +1203,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta> new_deltas; std::vector<KeeperStorage::Delta> new_deltas;
if (request.path.starts_with(keeper_system_path)) if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
{ {
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path); auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);
@ -1472,7 +1472,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
{ {
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
if (request.path.starts_with(keeper_system_path)) if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
{ {
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path); auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);

View File

@ -2141,6 +2141,38 @@ TEST_P(CoordinationTest, TestCurrentApiVersion)
EXPECT_EQ(keeper_version, static_cast<uint8_t>(current_keeper_api_version)); EXPECT_EQ(keeper_version, static_cast<uint8_t>(current_keeper_api_version));
} }
TEST_P(CoordinationTest, TestSystemNodeModify)
{
using namespace Coordination;
int64_t zxid{0};
// On INIT we abort when a system path is modified
keeper_context->server_state = KeeperContext::Phase::RUNNING;
KeeperStorage storage{500, "", keeper_context};
const auto assert_create = [&](const std::string_view path, const auto expected_code)
{
auto request = std::make_shared<ZooKeeperCreateRequest>();
request->path = path;
storage.preprocessRequest(request, 0, 0, zxid);
auto responses = storage.processRequest(request, 0, zxid);
ASSERT_FALSE(responses.empty());
const auto & response = responses[0];
ASSERT_EQ(response.response->error, expected_code) << "Unexpected error for path " << path;
++zxid;
};
assert_create("/keeper", Error::ZBADARGUMENTS);
assert_create("/keeper/with_child", Error::ZBADARGUMENTS);
assert_create(DB::keeper_api_version_path, Error::ZBADARGUMENTS);
assert_create("/keeper_map", Error::ZOK);
assert_create("/keeper1", Error::ZOK);
assert_create("/keepe", Error::ZOK);
assert_create("/keeper1/test", Error::ZOK);
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest, CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{ ::testing::ValuesIn(std::initializer_list<CompressionParam>{

View File

@ -84,18 +84,20 @@ void IDataType::forEachSubcolumn(
{ {
for (size_t i = 0; i < subpath.size(); ++i) for (size_t i = 0; i < subpath.size(); ++i)
{ {
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1)) size_t prefix_len = i + 1;
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, prefix_len))
{ {
auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1); auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len);
auto subdata = ISerialization::createFromPath(subpath, i); auto subdata = ISerialization::createFromPath(subpath, prefix_len);
callback(subpath, name, subdata); callback(subpath, name, subdata);
} }
subpath[i].visited = true; subpath[i].visited = true;
} }
}; };
SubstreamPath path; ISerialization::EnumerateStreamsSettings settings;
data.serialization->enumerateStreams(path, callback_with_data, data); settings.position_independent_encoding = false;
data.serialization->enumerateStreams(settings, callback_with_data, data);
} }
template <typename Ptr> template <typename Ptr>
@ -118,33 +120,38 @@ Ptr IDataType::getForSubcolumn(
return res; return res;
} }
bool IDataType::hasSubcolumn(const String & subcolumn_name) const
{
return tryGetSubcolumnType(subcolumn_name) != nullptr;
}
DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const
{ {
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, false); return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, false);
} }
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
{ {
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, true); return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, true);
} }
ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{ {
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, false); return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, false);
} }
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{ {
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, true); return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, true);
} }
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const
{ {
SubstreamData data = { serialization, nullptr, nullptr, nullptr }; auto data = SubstreamData(serialization);
return getForSubcolumn<SerializationPtr>(subcolumn_name, data, &SubstreamData::serialization, true); return getForSubcolumn<SerializationPtr>(subcolumn_name, data, &SubstreamData::serialization, true);
} }
@ -154,7 +161,7 @@ Names IDataType::getSubcolumnNames() const
forEachSubcolumn([&](const auto &, const auto & name, const auto &) forEachSubcolumn([&](const auto &, const auto & name, const auto &)
{ {
res.push_back(name); res.push_back(name);
}, { getDefaultSerialization(), nullptr, nullptr, nullptr }); }, SubstreamData(getDefaultSerialization()));
return res; return res;
} }

View File

@ -79,6 +79,8 @@ public:
/// Data type id. It's used for runtime type checks. /// Data type id. It's used for runtime type checks.
virtual TypeIndex getTypeId() const = 0; virtual TypeIndex getTypeId() const = 0;
bool hasSubcolumn(const String & subcolumn_name) const;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const; DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr getSubcolumnType(const String & subcolumn_name) const; DataTypePtr getSubcolumnType(const String & subcolumn_name) const;

View File

@ -73,24 +73,24 @@ String ISerialization::SubstreamPath::toString() const
} }
void ISerialization::enumerateStreams( void ISerialization::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
path.push_back(Substream::Regular); settings.path.push_back(Substream::Regular);
path.back().data = data; settings.path.back().data = data;
callback(path); callback(settings.path);
path.pop_back(); settings.path.pop_back();
} }
void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const void ISerialization::enumerateStreams(
const StreamCallback & callback,
const DataTypePtr & type,
const ColumnPtr & column) const
{ {
enumerateStreams(path, callback, {getPtr(), nullptr, nullptr, nullptr}); EnumerateStreamsSettings settings;
} auto data = SubstreamData(getPtr()).withType(type).withColumn(column);
enumerateStreams(settings, callback, data);
void ISerialization::enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const
{
enumerateStreams(path, callback, {getPtr(), type, nullptr, nullptr});
} }
void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const
@ -184,7 +184,7 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons
return getFileNameForStream(column.getNameInStorage(), path); return getFileNameForStream(column.getNameInStorage(), path);
} }
static size_t isOffsetsOfNested(const ISerialization::SubstreamPath & path) bool isOffsetsOfNested(const ISerialization::SubstreamPath & path)
{ {
if (path.empty()) if (path.empty())
return false; return false;
@ -287,10 +287,13 @@ bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t pref
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{ {
assert(prefix_len < path.size()); assert(prefix_len <= path.size());
if (prefix_len == 0)
return {};
SubstreamData res = path[prefix_len].data; ssize_t last_elem = prefix_len - 1;
for (ssize_t i = static_cast<ssize_t>(prefix_len) - 1; i >= 0; --i) auto res = path[last_elem].data;
for (ssize_t i = last_elem - 1; i >= 0; --i)
{ {
const auto & creator = path[i].creator; const auto & creator = path[i].creator;
if (creator) if (creator)

View File

@ -101,6 +101,30 @@ public:
struct SubstreamData struct SubstreamData
{ {
SubstreamData() = default;
SubstreamData(SerializationPtr serialization_)
: serialization(std::move(serialization_))
{
}
SubstreamData & withType(DataTypePtr type_)
{
type = std::move(type_);
return *this;
}
SubstreamData & withColumn(ColumnPtr column_)
{
column = std::move(column_);
return *this;
}
SubstreamData & withSerializationInfo(SerializationInfoPtr serialization_info_)
{
serialization_info = std::move(serialization_info_);
return *this;
}
SerializationPtr serialization; SerializationPtr serialization;
DataTypePtr type; DataTypePtr type;
ColumnPtr column; ColumnPtr column;
@ -164,16 +188,22 @@ public:
using StreamCallback = std::function<void(const SubstreamPath &)>; using StreamCallback = std::function<void(const SubstreamPath &)>;
struct EnumerateStreamsSettings
{
SubstreamPath path;
bool position_independent_encoding = true;
};
virtual void enumerateStreams( virtual void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const; const SubstreamData & data) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const; /// Enumerate streams with default settings.
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); } void enumerateStreams(
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); } const StreamCallback & callback,
const DataTypePtr & type = nullptr,
void enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const; const ColumnPtr & column = nullptr) const;
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>; using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>; using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
@ -375,4 +405,6 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const
return state_concrete; return state_concrete;
} }
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path);
} }

View File

@ -155,30 +155,30 @@ namespace
return column_offsets; return column_offsets;
} }
}
ColumnPtr arrayOffsetsToSizes(const IColumn & column) ColumnPtr arrayOffsetsToSizes(const IColumn & column)
{
const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
if (column_offsets.empty())
return column_sizes;
const auto & offsets_data = column_offsets.getData();
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
sizes_data.resize(offsets_data.size());
IColumn::Offset prev_offset = 0;
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
{ {
auto current_offset = offsets_data[i]; const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
sizes_data[i] = current_offset - prev_offset; MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
prev_offset = current_offset;
}
return column_sizes; if (column_offsets.empty())
return column_sizes;
const auto & offsets_data = column_offsets.getData();
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
sizes_data.resize(offsets_data.size());
IColumn::Offset prev_offset = 0;
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
{
auto current_offset = offsets_data[i];
sizes_data[i] = current_offset - prev_offset;
prev_offset = current_offset;
}
return column_sizes;
}
} }
DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const
@ -197,41 +197,42 @@ ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) c
} }
void SerializationArray::enumerateStreams( void SerializationArray::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
const auto * type_array = data.type ? &assert_cast<const DataTypeArray &>(*data.type) : nullptr; const auto * type_array = data.type ? &assert_cast<const DataTypeArray &>(*data.type) : nullptr;
const auto * column_array = data.column ? &assert_cast<const ColumnArray &>(*data.column) : nullptr; const auto * column_array = data.column ? &assert_cast<const ColumnArray &>(*data.column) : nullptr;
auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr; auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr;
path.push_back(Substream::ArraySizes); auto offsets_serialization =
path.back().data =
{
std::make_shared<SerializationNamed>( std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(), std::make_shared<SerializationNumber<UInt64>>(),
"size" + std::to_string(getArrayLevel(path)), false), "size" + std::to_string(getArrayLevel(settings.path)), false);
data.type ? std::make_shared<DataTypeUInt64>() : nullptr,
offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr,
data.serialization_info,
};
callback(path); auto offsets_column = offsets && !settings.position_independent_encoding
? arrayOffsetsToSizes(*offsets)
: offsets;
path.back() = Substream::ArrayElements; settings.path.push_back(Substream::ArraySizes);
path.back().data = data; settings.path.back().data = SubstreamData(offsets_serialization)
path.back().creator = std::make_shared<SubcolumnCreator>(offsets_column); .withType(type_array ? std::make_shared<DataTypeUInt64>() : nullptr)
.withColumn(std::move(offsets_column))
.withSerializationInfo(data.serialization_info);
SubstreamData next_data = callback(settings.path);
{
nested,
type_array ? type_array->getNestedType() : nullptr,
column_array ? column_array->getDataPtr() : nullptr,
data.serialization_info,
};
nested->enumerateStreams(path, callback, next_data); settings.path.back() = Substream::ArrayElements;
path.pop_back(); settings.path.back().data = data;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets);
auto next_data = SubstreamData(nested)
.withType(type_array ? type_array->getNestedType() : nullptr)
.withColumn(column_array ? column_array->getDataPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(settings, callback, next_data);
settings.path.pop_back();
} }
void SerializationArray::serializeBinaryBulkStatePrefix( void SerializationArray::serializeBinaryBulkStatePrefix(

View File

@ -36,7 +36,7 @@ public:
*/ */
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;
@ -79,6 +79,4 @@ private:
}; };
}; };
ColumnPtr arrayOffsetsToSizes(const IColumn & column);
} }

View File

@ -41,30 +41,26 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic
} }
void SerializationLowCardinality::enumerateStreams( void SerializationLowCardinality::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr; const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr;
SubstreamData dict_data = settings.path.push_back(Substream::DictionaryKeys);
{ auto dict_data = SubstreamData(dict_inner_serialization)
dict_inner_serialization, .withType(data.type ? dictionary_type : nullptr)
data.type ? dictionary_type : nullptr, .withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr)
column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr, .withSerializationInfo(data.serialization_info);
data.serialization_info,
};
path.push_back(Substream::DictionaryKeys); settings.path.back().data = dict_data;
path.back().data = dict_data; dict_inner_serialization->enumerateStreams(settings, callback, dict_data);
dict_inner_serialization->enumerateStreams(path, callback, dict_data); settings.path.back() = Substream::DictionaryIndexes;
settings.path.back().data = data;
path.back() = Substream::DictionaryIndexes; callback(settings.path);
path.back().data = data; settings.path.pop_back();
callback(path);
path.pop_back();
} }
struct KeysSerializationVersion struct KeysSerializationVersion

View File

@ -18,7 +18,7 @@ public:
explicit SerializationLowCardinality(const DataTypePtr & dictionary_type); explicit SerializationLowCardinality(const DataTypePtr & dictionary_type);
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -257,19 +257,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
} }
void SerializationMap::enumerateStreams( void SerializationMap::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
SubstreamData next_data = auto next_data = SubstreamData(nested)
{ .withType(data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr)
nested, .withColumn(data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr)
data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr, .withSerializationInfo(data.serialization_info);
data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr,
data.serialization_info,
};
nested->enumerateStreams(path, callback, next_data); nested->enumerateStreams(settings, callback, next_data);
} }
void SerializationMap::serializeBinaryBulkStatePrefix( void SerializationMap::serializeBinaryBulkStatePrefix(

View File

@ -32,7 +32,7 @@ public:
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -4,16 +4,16 @@ namespace DB
{ {
void SerializationNamed::enumerateStreams( void SerializationNamed::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
addToPath(path); addToPath(settings.path);
path.back().data = data; settings.path.back().data = data;
path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter); settings.path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
nested_serialization->enumerateStreams(path, callback, data); nested_serialization->enumerateStreams(settings, callback, data);
path.pop_back(); settings.path.pop_back();
} }
void SerializationNamed::serializeBinaryBulkStatePrefix( void SerializationNamed::serializeBinaryBulkStatePrefix(

View File

@ -26,7 +26,7 @@ public:
const String & getElementName() const { return name; } const String & getElementName() const { return name; }
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -38,38 +38,35 @@ ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev
} }
void SerializationNullable::enumerateStreams( void SerializationNullable::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
const auto * type_nullable = data.type ? &assert_cast<const DataTypeNullable &>(*data.type) : nullptr; const auto * type_nullable = data.type ? &assert_cast<const DataTypeNullable &>(*data.type) : nullptr;
const auto * column_nullable = data.column ? &assert_cast<const ColumnNullable &>(*data.column) : nullptr; const auto * column_nullable = data.column ? &assert_cast<const ColumnNullable &>(*data.column) : nullptr;
path.push_back(Substream::NullMap); auto null_map_serialization = std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false);
path.back().data =
{
std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false),
type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr,
column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr,
data.serialization_info,
};
callback(path); settings.path.push_back(Substream::NullMap);
auto null_map_data = SubstreamData(null_map_serialization)
.withType(type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr)
.withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
path.back() = Substream::NullableElements; settings.path.back().data = null_map_data;
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column); callback(settings.path);
path.back().data = data;
SubstreamData next_data = settings.path.back() = Substream::NullableElements;
{ settings.path.back().creator = std::make_shared<SubcolumnCreator>(null_map_data.column);
nested, settings.path.back().data = data;
type_nullable ? type_nullable->getNestedType() : nullptr,
column_nullable ? column_nullable->getNestedColumnPtr() : nullptr,
data.serialization_info,
};
nested->enumerateStreams(path, callback, next_data); auto next_data = SubstreamData(nested)
path.pop_back(); .withType(type_nullable ? type_nullable->getNestedType() : nullptr)
.withColumn(column_nullable ? column_nullable->getNestedColumnPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(settings, callback, next_data);
settings.path.pop_back();
} }
void SerializationNullable::serializeBinaryBulkStatePrefix( void SerializationNullable::serializeBinaryBulkStatePrefix(

View File

@ -14,7 +14,7 @@ public:
explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {} explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {}
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -148,39 +148,33 @@ ColumnPtr SerializationSparse::SubcolumnCreator::create(const ColumnPtr & prev)
} }
void SerializationSparse::enumerateStreams( void SerializationSparse::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
const auto * column_sparse = data.column ? &assert_cast<const ColumnSparse &>(*data.column) : nullptr; const auto * column_sparse = data.column ? &assert_cast<const ColumnSparse &>(*data.column) : nullptr;
size_t column_size = column_sparse ? column_sparse->size() : 0; size_t column_size = column_sparse ? column_sparse->size() : 0;
path.push_back(Substream::SparseOffsets); settings.path.push_back(Substream::SparseOffsets);
path.back().data = auto offsets_data = SubstreamData(std::make_shared<SerializationNumber<UInt64>>())
{ .withType(data.type ? std::make_shared<DataTypeUInt64>() : nullptr)
std::make_shared<SerializationNumber<UInt64>>(), .withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr)
data.type ? std::make_shared<DataTypeUInt64>() : nullptr, .withSerializationInfo(data.serialization_info);
column_sparse ? column_sparse->getOffsetsPtr() : nullptr,
data.serialization_info,
};
callback(path); settings.path.back().data = offsets_data;
callback(settings.path);
path.back() = Substream::SparseElements; settings.path.back() = Substream::SparseElements;
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column, column_size); settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets_data.column, column_size);
path.back().data = data; settings.path.back().data = data;
SubstreamData next_data = auto next_data = SubstreamData(nested)
{ .withType(data.type)
nested, .withColumn(column_sparse ? column_sparse->getValuesPtr() : nullptr)
data.type, .withSerializationInfo(data.serialization_info);
column_sparse ? column_sparse->getValuesPtr() : nullptr,
data.serialization_info,
};
nested->enumerateStreams(path, callback, next_data); nested->enumerateStreams(settings, callback, next_data);
path.pop_back(); settings.path.pop_back();
} }
void SerializationSparse::serializeBinaryBulkStatePrefix( void SerializationSparse::serializeBinaryBulkStatePrefix(

View File

@ -28,7 +28,7 @@ public:
Kind getKind() const override { return Kind::SPARSE; } Kind getKind() const override { return Kind::SPARSE; }
virtual void enumerateStreams( virtual void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -283,7 +283,7 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
} }
void SerializationTuple::enumerateStreams( void SerializationTuple::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
@ -293,15 +293,12 @@ void SerializationTuple::enumerateStreams(
for (size_t i = 0; i < elems.size(); ++i) for (size_t i = 0; i < elems.size(); ++i)
{ {
SubstreamData next_data = auto next_data = SubstreamData(elems[i])
{ .withType(type_tuple ? type_tuple->getElement(i) : nullptr)
elems[i], .withColumn(column_tuple ? column_tuple->getColumnPtr(i) : nullptr)
type_tuple ? type_tuple->getElement(i) : nullptr, .withSerializationInfo(info_tuple ? info_tuple->getElementInfo(i) : nullptr);
column_tuple ? column_tuple->getColumnPtr(i) : nullptr,
info_tuple ? info_tuple->getElementInfo(i) : nullptr,
};
elems[i]->enumerateStreams(path, callback, next_data); elems[i]->enumerateStreams(settings, callback, next_data);
} }
} }

View File

@ -34,7 +34,7 @@ public:
/** Each sub-column in a tuple is serialized in separate stream. /** Each sub-column in a tuple is serialized in separate stream.
*/ */
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -5,11 +5,11 @@ namespace DB
{ {
void SerializationWrapper::enumerateStreams( void SerializationWrapper::enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const const SubstreamData & data) const
{ {
nested_serialization->enumerateStreams(path, callback, data); nested_serialization->enumerateStreams(settings, callback, data);
} }
void SerializationWrapper::serializeBinaryBulkStatePrefix( void SerializationWrapper::serializeBinaryBulkStatePrefix(

View File

@ -21,7 +21,7 @@ public:
Kind getKind() const override { return nested_serialization->getKind(); } Kind getKind() const override { return nested_serialization->getKind(); }
void enumerateStreams( void enumerateStreams(
SubstreamPath & path, EnumerateStreamsSettings & settings,
const StreamCallback & callback, const StreamCallback & callback,
const SubstreamData & data) const override; const SubstreamData & data) const override;

View File

@ -31,7 +31,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false); enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD); enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD);
do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", true); do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", false);
} }
} }

View File

@ -163,7 +163,7 @@ BlockIO InterpreterDescribeQuery::execute()
res_columns[6]->insertDefault(); res_columns[6]->insertDefault();
res_columns[7]->insert(1u); res_columns[7]->insert(1u);
}, { type->getDefaultSerialization(), type, nullptr, nullptr }); }, ISerialization::SubstreamData(type->getDefaultSerialization()).withType(type));
} }
} }

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <utility> #include <utility>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h> #include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
@ -187,29 +188,56 @@ ActionsDAGPtr evaluateMissingDefaults(
return createExpressions(header, expr_list, save_unneeded_columns, context); return createExpressions(header, expr_list, save_unneeded_columns, context);
} }
static bool arrayHasNoElementsRead(const IColumn & column) static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
const NamesAndTypesList & available_columns, const Columns & res_columns)
{ {
const auto * column_array = typeid_cast<const ColumnArray *>(&column); std::unordered_map<String, ColumnPtr> offsets_columns;
if (!column_array) auto available_column = available_columns.begin();
return false; for (size_t i = 0; i < available_columns.size(); ++i, ++available_column)
{
if (res_columns[i] == nullptr || isColumnConst(*res_columns[i]))
continue;
size_t size = column_array->size(); auto serialization = IDataType::getSerialization(*available_column);
if (!size) serialization->enumerateStreams([&](const auto & subpath)
return false; {
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
size_t data_size = column_array->getData().size(); auto stream_name = ISerialization::getFileNameForStream(*available_column, subpath);
if (data_size) const auto & current_offsets_column = subpath.back().data.column;
return false;
size_t last_offset = column_array->getOffsets()[size - 1]; /// If for some reason multiple offsets columns are present
return last_offset != 0; /// for the same nested data structure, choose the one that is not empty.
if (current_offsets_column && !current_offsets_column->empty())
{
auto & offsets_column = offsets_columns[stream_name];
if (!offsets_column)
offsets_column = current_offsets_column;
#ifndef NDEBUG
const auto & offsets_data = assert_cast<const ColumnUInt64 &>(*offsets_column).getData();
const auto & current_offsets_data = assert_cast<const ColumnUInt64 &>(*current_offsets_column).getData();
if (offsets_data != current_offsets_data)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Found non-equal columns with offsets (sizes: {} and {}) for stream {}",
offsets_data.size(), current_offsets_data.size(), stream_name);
#endif
}
}, available_column->type, res_columns[i]);
}
return offsets_columns;
} }
void fillMissingColumns( void fillMissingColumns(
Columns & res_columns, Columns & res_columns,
size_t num_rows, size_t num_rows,
const NamesAndTypesList & requested_columns, const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot) StorageMetadataPtr metadata_snapshot)
{ {
size_t num_columns = requested_columns.size(); size_t num_columns = requested_columns.size();
@ -218,65 +246,79 @@ void fillMissingColumns(
"Invalid number of columns passed to fillMissingColumns. Expected {}, got {}", "Invalid number of columns passed to fillMissingColumns. Expected {}, got {}",
num_columns, res_columns.size()); num_columns, res_columns.size());
/// For a missing column of a nested data structure we must create not a column of empty /// For a missing column of a nested data structure
/// arrays, but a column of arrays of correct length. /// we must create not a column of empty arrays,
/// but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block. /// First, collect offset columns for all arrays in the block.
auto offsets_columns = collectOffsetsColumns(available_columns, res_columns);
std::unordered_map<String, ColumnPtr> offset_columns; /// Insert default values only for columns without default expressions.
auto requested_column = requested_columns.begin(); auto requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column) for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
if (res_columns[i] == nullptr)
continue;
if (const auto * array = typeid_cast<const ColumnArray *>(res_columns[i].get()))
{
String offsets_name = Nested::extractTableName(requested_column->name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason multiple offsets columns are present for the same nested data structure,
/// choose the one that is not empty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
}
}
/// insert default values only for columns without default expressions
requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{ {
const auto & [name, type] = *requested_column; const auto & [name, type] = *requested_column;
if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i])) if (res_columns[i] && partially_read_columns.contains(name))
res_columns[i] = nullptr; res_columns[i] = nullptr;
if (res_columns[i] == nullptr) if (res_columns[i])
continue;
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
continue;
std::vector<ColumnPtr> current_offsets;
size_t num_dimensions = 0;
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
if (array_type && !offsets_columns.empty())
{ {
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) num_dimensions = getNumberOfDimensions(*array_type);
continue; current_offsets.resize(num_dimensions);
String offsets_name = Nested::extractTableName(name); auto serialization = IDataType::getSerialization(*requested_column);
auto offset_it = offset_columns.find(offsets_name); serialization->enumerateStreams([&](const auto & subpath)
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
if (offset_it != offset_columns.end() && array_type)
{ {
const auto & nested_type = array_type->getNestedType(); if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
ColumnPtr offsets_column = offset_it->second; return;
size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = size_t level = ISerialization::getArrayLevel(subpath);
nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst(); assert(level < num_dimensions);
res_columns[i] = ColumnArray::create(nested_column, offsets_column); auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath);
} auto it = offsets_columns.find(stream_name);
else if (it != offsets_columns.end())
current_offsets[level] = it->second;
});
for (size_t j = 0; j < num_dimensions; ++j)
{ {
/// We must turn a constant column into a full column because the interpreter could infer if (!current_offsets[j])
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column. {
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst(); current_offsets.resize(j);
break;
}
} }
} }
if (!current_offsets.empty())
{
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
} }
} }

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Core/Names.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Common/COW.h> #include <Common/COW.h>
@ -43,6 +44,8 @@ void fillMissingColumns(
Columns & res_columns, Columns & res_columns,
size_t num_rows, size_t num_rows,
const NamesAndTypesList & requested_columns, const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot); StorageMetadataPtr metadata_snapshot);
} }

View File

@ -780,7 +780,7 @@ void ColumnsDescription::addSubcolumns(const String & name_in_storage, const Dat
"Cannot add subcolumn {}: column with this name already exists", subcolumn.name); "Cannot add subcolumn {}: column with this name already exists", subcolumn.name);
subcolumns.get<0>().insert(std::move(subcolumn)); subcolumns.get<0>().insert(std::move(subcolumn));
}, {type_in_storage->getDefaultSerialization(), type_in_storage, nullptr, nullptr}); }, ISerialization::SubstreamData(type_in_storage->getDefaultSerialization()).withType(type_in_storage));
} }
void ColumnsDescription::removeSubcolumns(const String & name_in_storage) void ColumnsDescription::removeSubcolumns(const String & name_in_storage)

View File

@ -650,23 +650,31 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor
} }
void DataPartStorageOnDisk::backup( void DataPartStorageOnDisk::backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums, const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums, const NameSet & files_without_checksums,
const String & path_in_backup, const String & path_in_backup,
BackupEntries & backup_entries) const BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const
{ {
fs::path part_path_on_disk = fs::path{root_path} / part_dir; fs::path part_path_on_disk = fs::path{root_path} / part_dir;
fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir; fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir;
auto disk = volume->getDisk(); auto disk = volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end()) fs::path temp_part_dir;
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first; std::shared_ptr<TemporaryFileOnDisk> temp_dir_owner;
auto temp_dir_owner = temp_dir_it->second; if (make_temporary_hard_links)
fs::path temp_dir = temp_dir_owner->getPath(); {
fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path(); assert(temp_dirs);
disk->createDirectories(temp_part_dir); auto temp_dir_it = temp_dirs->find(disk);
if (temp_dir_it == temp_dirs->end())
temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
temp_part_dir = temp_dir / part_path_in_backup.relative_path();
disk->createDirectories(temp_part_dir);
}
/// For example, /// For example,
/// part_path_in_backup = /data/test/table/0_1_1_0 /// part_path_in_backup = /data/test/table/0_1_1_0
@ -683,13 +691,18 @@ void DataPartStorageOnDisk::backup(
continue; /// Skip *.proj files - they're actually directories and will be handled. continue; /// Skip *.proj files - they're actually directories and will be handled.
String filepath_on_disk = part_path_on_disk / filepath; String filepath_on_disk = part_path_on_disk / filepath;
String filepath_in_backup = part_path_in_backup / filepath; String filepath_in_backup = part_path_in_backup / filepath;
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(filepath_on_disk, hardlink_filepath); if (make_temporary_hard_links)
{
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(filepath_on_disk, hardlink_filepath);
filepath_on_disk = hardlink_filepath;
}
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second}; UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
backup_entries.emplace_back( backup_entries.emplace_back(
filepath_in_backup, filepath_in_backup,
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner)); std::make_unique<BackupEntryFromImmutableFile>(disk, filepath_on_disk, checksum.file_size, file_hash, temp_dir_owner));
} }
for (const auto & filepath : files_without_checksums) for (const auto & filepath : files_without_checksums)

View File

@ -89,11 +89,12 @@ public:
bool shallParticipateInMerges(const IStoragePolicy &) const override; bool shallParticipateInMerges(const IStoragePolicy &) const override;
void backup( void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums, const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums, const NameSet & files_without_checksums,
const String & path_in_backup, const String & path_in_backup,
BackupEntries & backup_entries) const override; BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const override;
DataPartStoragePtr freeze( DataPartStoragePtr freeze(
const std::string & to, const std::string & to,

View File

@ -422,8 +422,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
if (data_settings->allow_remote_fs_zero_copy_replication && !try_zero_copy) if (data.canUseZeroCopyReplication() && !try_zero_copy)
LOG_WARNING(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name); LOG_INFO(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name);
/// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/, /// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/,
/// but detached part name prefix should not contain underscore. /// but detached part name prefix should not contain underscore.
@ -479,8 +479,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
} }
else else
{ {
if (data_settings->allow_remote_fs_zero_copy_replication) if (data.canUseZeroCopyReplication())
LOG_WARNING(log, "Cannot select any zero-copy disk for {}", part_name); LOG_INFO(log, "Cannot select any zero-copy disk for {}", part_name);
try_zero_copy = false; try_zero_copy = false;
} }

View File

@ -177,11 +177,12 @@ public:
/// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time). /// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time).
using TemporaryFilesOnDisks = std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>>; using TemporaryFilesOnDisks = std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>>;
virtual void backup( virtual void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums, const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums, const NameSet & files_without_checksums,
const String & path_in_backup, const String & path_in_backup,
BackupEntries & backup_entries) const = 0; BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const = 0;
/// Creates hardlinks into 'to/dir_path' for every file in data part. /// Creates hardlinks into 'to/dir_path' for every file in data part.
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed. /// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.

View File

@ -445,11 +445,11 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
column_name_to_position.clear(); column_name_to_position.clear();
column_name_to_position.reserve(new_columns.size()); column_name_to_position.reserve(new_columns.size());
size_t pos = 0; size_t pos = 0;
for (const auto & column : columns)
column_name_to_position.emplace(column.name, pos++);
for (const auto & column : columns) for (const auto & column : columns)
{ {
column_name_to_position.emplace(column.name, pos++);
auto it = serialization_infos.find(column.name); auto it = serialization_infos.find(column.name);
auto serialization = it == serialization_infos.end() auto serialization = it == serialization_infos.end()
? IDataType::getSerialization(column) ? IDataType::getSerialization(column)
@ -461,7 +461,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
{ {
auto full_name = Nested::concatenateName(column.name, subname); auto full_name = Nested::concatenateName(column.name, subname);
serializations.emplace(full_name, subdata.serialization); serializations.emplace(full_name, subdata.serialization);
}, {serialization, nullptr, nullptr, nullptr}); }, ISerialization::SubstreamData(serialization));
} }
columns_description = ColumnsDescription(columns); columns_description = ColumnsDescription(columns);
@ -1352,7 +1352,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
} }
} }
void IMergeTreeDataPart::appendFilesOfColumns(Strings & files) void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
{ {
files.push_back("columns.txt"); files.push_back("columns.txt");

View File

@ -63,7 +63,13 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
{ {
try try
{ {
DB::fillMissingColumns(res_columns, num_rows, requested_columns, metadata_snapshot); NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end());
DB::fillMissingColumns(
res_columns, num_rows,
Nested::convertToSubcolumns(requested_columns),
Nested::convertToSubcolumns(available_columns),
partially_read_columns, metadata_snapshot);
should_evaluate_missing_defaults = std::any_of( should_evaluate_missing_defaults = std::any_of(
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; }); res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
} }
@ -201,20 +207,56 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
} }
} }
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
{ {
String table_name = Nested::extractTableName(column_name); auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
{
Names offsets_streams;
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
offsets_streams.push_back(full_name);
});
return offsets_streams;
};
auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage());
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
size_t max_matched_streams = 0;
ColumnPosition position;
/// Find column that has maximal number of matching
/// offsets columns with required_column.
for (const auto & part_column : data_part_info_for_read->getColumns()) for (const auto & part_column : data_part_info_for_read->getColumns())
{ {
if (typeid_cast<const DataTypeArray *>(part_column.type.get())) auto name_in_storage = Nested::extractTableName(part_column.name);
if (name_in_storage != required_name_in_storage)
continue;
auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage);
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
size_t i = 0;
for (; i < required_offsets_streams.size(); ++i)
{ {
auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage()); if (!offsets_streams_set.contains(required_offsets_streams[i]))
if (position && Nested::extractTableName(part_column.name) == table_name) break;
return position; }
if (i && (!position || i > max_matched_streams))
{
max_matched_streams = i;
position = data_part_info_for_read->getColumnPosition(part_column.name);
} }
} }
return {}; return position;
} }
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const

View File

@ -92,7 +92,9 @@ protected:
MarkRanges all_mark_ranges; MarkRanges all_mark_ranges;
using ColumnPosition = std::optional<size_t>; using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const String & column_name) const; ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
NameSet partially_read_columns;
private: private:
/// Alter conversions, which must be applied on fly if required /// Alter conversions, which must be applied on fly if required

View File

@ -3,6 +3,7 @@
#include <Backups/BackupEntriesCollector.h> #include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromImmutableFile.h> #include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h> #include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h> #include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h> #include <Backups/RestorerFromBackup.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
@ -4109,29 +4110,74 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector
else else
data_parts = getVisibleDataPartsVector(local_context); data_parts = getVisibleDataPartsVector(local_context);
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
} }
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context)
{ {
BackupEntries backup_entries; BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs; std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
TableLockHolder table_lock;
for (const auto & part : data_parts) for (const auto & part : data_parts)
{ {
/// Hard links is the default way to ensure that we'll be keeping access to the files of parts.
bool make_temporary_hard_links = true;
bool hold_storage_and_part_ptrs = false;
bool hold_table_lock = false;
if (getStorageID().hasUUID())
{
/// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup,
/// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting
/// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog).
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
}
else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
{
/// Hard links don't work correctly with zero copy replication.
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
hold_table_lock = true;
}
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
BackupEntries backup_entries_from_part;
part->data_part_storage->backup( part->data_part_storage->backup(
temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, backup_entries); part->checksums,
part->getFileNamesWithoutChecksums(),
data_path_in_backup,
backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
auto projection_parts = part->getProjectionParts(); auto projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts) for (const auto & [projection_name, projection_part] : projection_parts)
{ {
projection_part->data_part_storage->backup( projection_part->data_part_storage->backup(
temp_dirs,
projection_part->checksums, projection_part->checksums,
projection_part->getFileNamesWithoutChecksums(), projection_part->getFileNamesWithoutChecksums(),
fs::path{data_path_in_backup} / part->name, fs::path{data_path_in_backup} / part->name,
backup_entries); backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
} }
if (hold_storage_and_part_ptrs)
{
/// Wrap backup entries with smart pointers to data parts and to the storage itself
/// (we'll be holding those smart pointers for as long as we'll be using the backup entries).
auto storage_and_part = std::make_pair(shared_from_this(), part);
if (hold_table_lock)
wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock));
else
wrapBackupEntriesWith(backup_entries_from_part, storage_and_part);
}
insertAtEnd(backup_entries, std::move(backup_entries_from_part));
} }
return backup_entries; return backup_entries;

View File

@ -1231,7 +1231,7 @@ protected:
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Makes backup entries to backup the parts of this table. /// Makes backup entries to backup the parts of this table.
static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup); BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context);
class RestoredPartsHolder; class RestoredPartsHolder;

View File

@ -66,8 +66,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
compressed_streams.emplace(stream_name, stream); compressed_streams.emplace(stream_name, stream);
}; };
ISerialization::SubstreamPath path; data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type);
} }
namespace namespace

View File

@ -121,7 +121,7 @@ void MergeTreeDataPartWriterWide::addStreams(
}; };
ISerialization::SubstreamPath path; ISerialization::SubstreamPath path;
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type); data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
} }
@ -255,10 +255,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
void MergeTreeDataPartWriterWide::writeSingleMark( void MergeTreeDataPartWriterWide::writeSingleMark(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
size_t number_of_rows, size_t number_of_rows)
ISerialization::SubstreamPath & path)
{ {
StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns, path); StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns);
for (const auto & mark : marks) for (const auto & mark : marks)
flushMarkToFile(mark, number_of_rows); flushMarkToFile(mark, number_of_rows);
} }
@ -274,8 +273,7 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns)
ISerialization::SubstreamPath & path)
{ {
StreamsWithMarks result; StreamsWithMarks result;
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
@ -300,7 +298,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset(); stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
result.push_back(stream_with_mark); result.push_back(stream_with_mark);
}, path); });
return result; return result;
} }
@ -328,7 +326,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
return; return;
column_streams[stream_name]->compressed.nextIfAtEnd(); column_streams[stream_name]->compressed.nextIfAtEnd();
}, serialize_settings.path); });
} }
/// Column must not be empty. (column.size() !== 0) /// Column must not be empty. (column.size() !== 0)
@ -366,7 +364,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
{ {
if (last_non_written_marks.contains(name)) if (last_non_written_marks.contains(name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark); throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns, serialize_settings.path); last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns);
} }
writeSingleGranule( writeSingleGranule(
@ -390,7 +388,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
} }
} }
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{ {
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes; bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets) if (is_offsets)
@ -398,7 +396,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
offset_columns.insert(stream_name); offset_columns.insert(stream_name);
} }
}, serialize_settings.path); });
} }
@ -553,7 +551,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
} }
if (write_final_mark) if (write_final_mark)
writeFinalMark(*it, offset_columns, serialize_settings.path); writeFinalMark(*it, offset_columns);
} }
} }
@ -618,10 +616,9 @@ void MergeTreeDataPartWriterWide::finish(bool sync)
void MergeTreeDataPartWriterWide::writeFinalMark( void MergeTreeDataPartWriterWide::writeFinalMark(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns)
ISerialization::SubstreamPath & path)
{ {
writeSingleMark(column, offset_columns, 0, path); writeSingleMark(column, offset_columns, 0);
/// Memoize information about offsets /// Memoize information about offsets
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{ {
@ -631,7 +628,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
String stream_name = ISerialization::getFileNameForStream(column, substream_path); String stream_name = ISerialization::getFileNameForStream(column, substream_path);
offset_columns.insert(stream_name); offset_columns.insert(stream_name);
} }
}, path); });
} }
static void fillIndexGranularityImpl( static void fillIndexGranularityImpl(

View File

@ -61,8 +61,7 @@ private:
/// Take offsets from column and return as MarkInCompressed file with stream name /// Take offsets from column and return as MarkInCompressed file with stream name
StreamsWithMarks getCurrentMarksForColumn( StreamsWithMarks getCurrentMarksForColumn(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns);
ISerialization::SubstreamPath & path);
/// Write mark to disk using stream and rows count /// Write mark to disk using stream and rows count
void flushMarkToFile( void flushMarkToFile(
@ -73,13 +72,11 @@ private:
void writeSingleMark( void writeSingleMark(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
size_t number_of_rows, size_t number_of_rows);
ISerialization::SubstreamPath & path);
void writeFinalMark( void writeFinalMark(
const NameAndTypePair & column, const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns);
ISerialization::SubstreamPath & path);
void addStreams( void addStreams(
const NameAndTypePair & column, const NameAndTypePair & column,

View File

@ -46,35 +46,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{ {
try try
{ {
size_t columns_num = columns_to_read.size(); fillColumnPositions();
column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
for (size_t i = 0; i < columns_num; ++i)
{
const auto & column_to_read = columns_to_read[i];
if (column_to_read.isSubcolumn())
{
auto storage_column_from_part = getColumnInPart(
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
if (!storage_column_from_part.type->tryGetSubcolumnType(column_to_read.getSubcolumnName()))
continue;
}
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_to_read.type.get()))
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read.name);
read_only_offsets[i] = (position != std::nullopt);
}
column_positions[i] = std::move(position);
}
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges); auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
@ -137,6 +109,44 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
} }
} }
void MergeTreeReaderCompact::fillColumnPositions()
{
size_t columns_num = columns_to_read.size();
column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
for (size_t i = 0; i < columns_num; ++i)
{
const auto & column_to_read = columns_to_read[i];
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
bool is_array = isArray(column_to_read.type);
if (column_to_read.isSubcolumn())
{
auto storage_column_from_part = getColumnInPart(
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
auto subcolumn_name = column_to_read.getSubcolumnName();
if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name))
position.reset();
}
if (!position && is_array)
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read);
read_only_offsets[i] = (position != std::nullopt);
}
column_positions[i] = std::move(position);
if (read_only_offsets[i])
partially_read_columns.insert(column_to_read.name);
}
}
size_t MergeTreeReaderCompact::readRows( size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{ {
@ -214,7 +224,8 @@ void MergeTreeReaderCompact::readData(
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer * auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{ {
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (only_offsets && !is_offsets)
return nullptr; return nullptr;
return data_buffer; return data_buffer;

View File

@ -39,6 +39,7 @@ public:
private: private:
bool isContinuousReading(size_t mark, size_t column_position); bool isContinuousReading(size_t mark, size_t column_position);
void fillColumnPositions();
ReadBuffer * data_buffer; ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer; CompressedReadBufferBase * compressed_data_buffer;

View File

@ -33,13 +33,19 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
{}) {})
, part_in_memory(std::move(data_part_)) , part_in_memory(std::move(data_part_))
{ {
for (const auto & [name, type] : columns_to_read) for (const auto & column_to_read : columns_to_read)
{ {
/// If array of Nested column is missing in part, /// If array of Nested column is missing in part,
/// we have to read its offsets if they exist. /// we have to read its offsets if they exist.
if (!part_in_memory->block.has(name) && typeid_cast<const DataTypeArray *>(type.get())) if (typeid_cast<const DataTypeArray *>(column_to_read.type.get())
if (auto offset_position = findColumnForOffsets(name)) && !tryGetColumnFromBlock(part_in_memory->block, column_to_read))
positions_for_offsets[name] = *offset_position; {
if (auto offsets_position = findColumnForOffsets(column_to_read))
{
positions_for_offsets[column_to_read.name] = *offsets_position;
partially_read_columns.insert(column_to_read.name);
}
}
} }
} }

View File

@ -16,7 +16,6 @@ namespace DB
namespace namespace
{ {
using OffsetColumns = std::map<std::string, ColumnPtr>;
constexpr auto DATA_FILE_EXTENSION = ".bin"; constexpr auto DATA_FILE_EXTENSION = ".bin";
} }
@ -160,12 +159,18 @@ void MergeTreeReaderWide::addStreams(
const ReadBufferFromFileBase::ProfileCallback & profile_callback, const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type) clockid_t clock_type)
{ {
bool has_any_stream = false;
bool has_all_streams = true;
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path) ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
{ {
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (streams.contains(stream_name)) if (streams.contains(stream_name))
{
has_any_stream = true;
return; return;
}
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION); bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
@ -173,8 +178,12 @@ void MergeTreeReaderWide::addStreams(
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts. * It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/ */
if (!data_file_exists) if (!data_file_exists)
{
has_all_streams = false;
return; return;
}
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys; bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>( streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
@ -186,6 +195,9 @@ void MergeTreeReaderWide::addStreams(
}; };
serialization->enumerateStreams(callback); serialization->enumerateStreams(callback);
if (has_any_stream && !has_all_streams)
partially_read_columns.insert(name_and_type.name);
} }
@ -283,6 +295,7 @@ void MergeTreeReaderWide::readData(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, current_task_last_mark, cache); seek_to_mark, current_task_last_mark, cache);
}; };
deserialize_settings.continuous_reading = continue_reading; deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name]; auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name];

View File

@ -1302,7 +1302,7 @@ private:
*ctx->source_part->data_part_storage, it->name(), destination); *ctx->source_part->data_part_storage, it->name(), destination);
hardlinked_files.insert(it->name()); hardlinked_files.insert(it->name());
} }
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{ {
// it's a projection part directory // it's a projection part directory
ctx->data_part_storage_builder->createProjection(destination); ctx->data_part_storage_builder->createProjection(destination);

View File

@ -95,7 +95,7 @@ protected:
++name_and_type; ++name_and_type;
} }
fillMissingColumns(columns, src.rows(), column_names_and_types, /*metadata_snapshot=*/ nullptr); fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; })); assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
return Chunk(std::move(columns), src.rows()); return Chunk(std::move(columns), src.rows());

View File

@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
for (const auto & data_part : data_parts) for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion()); min_data_version = std::min(min_data_version, data_part->info.getDataVersion());
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup)); backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup));
} }

View File

@ -7336,6 +7336,21 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context
} }
bool StorageReplicatedMergeTree::canUseZeroCopyReplication() const
{
auto settings_ptr = getSettings();
if (!settings_ptr->allow_remote_fs_zero_copy_replication)
return false;
auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
{
if (disk->supportZeroCopyReplication())
return true;
}
return false;
}
void StorageReplicatedMergeTree::checkBrokenDisks() void StorageReplicatedMergeTree::checkBrokenDisks()
{ {
auto disks = getStoragePolicy()->getDisks(); auto disks = getStoragePolicy()->getDisks();
@ -8290,7 +8305,7 @@ void StorageReplicatedMergeTree::backupData(
else else
data_parts = getVisibleDataPartsVector(local_context); data_parts = getVisibleDataPartsVector(local_context);
auto backup_entries = backupParts(data_parts, ""); auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", local_context);
auto coordination = backup_entries_collector.getBackupCoordination(); auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID(); String shared_id = getTableSharedID();

View File

@ -327,6 +327,7 @@ public:
static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid, static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context); const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
bool canUseZeroCopyReplication() const;
private: private:
std::atomic_bool are_restoring_replica {false}; std::atomic_bool are_restoring_replica {false};

View File

@ -242,7 +242,7 @@ void StorageSystemPartsColumns::processNextStorage(
IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data) IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data)
{ {
/// We count only final subcolumns, which are represented by files on disk /// We count only final subcolumns, which are represented by files on disk
/// and skip intermediate suibcolumns of types Tuple and Nested. /// and skip intermediate subcolumns of types Tuple and Nested.
if (isTuple(data.type) || isNested(data.type)) if (isTuple(data.type) || isNested(data.type))
return; return;
@ -270,7 +270,7 @@ void StorageSystemPartsColumns::processNextStorage(
subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed); subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed);
subcolumn_marks_bytes.push_back(size.marks); subcolumn_marks_bytes.push_back(size.marks);
}, { serialization, column.type, nullptr, nullptr }); }, ISerialization::SubstreamData(serialization).withType(column.type));
if (columns_mask[src_index++]) if (columns_mask[src_index++])
columns[res_index++]->insert(subcolumn_names); columns[res_index++]->insert(subcolumn_names);

View File

@ -17,7 +17,7 @@ from env_helper import (
from s3_helper import S3Helper from s3_helper import S3Helper
from get_robot_token import get_best_robot_token from get_robot_token import get_best_robot_token
from pr_info import PRInfo from pr_info import PRInfo
from build_download_helper import get_build_name_for_check, get_build_urls from build_download_helper import get_build_name_for_check, read_build_urls
from docker_pull_helper import get_image_with_version from docker_pull_helper import get_image_with_version
from commit_status_helper import post_commit_status from commit_status_helper import post_commit_status
from clickhouse_helper import ClickHouseHelper, prepare_tests_results_for_clickhouse from clickhouse_helper import ClickHouseHelper, prepare_tests_results_for_clickhouse
@ -29,7 +29,11 @@ IMAGE_NAME = "clickhouse/fuzzer"
def get_run_command(pr_number, sha, download_url, workspace_path, image): def get_run_command(pr_number, sha, download_url, workspace_path, image):
return ( return (
f"docker run --network=host --volume={workspace_path}:/workspace " f"docker run "
# For sysctl
"--privileged "
"--network=host "
f"--volume={workspace_path}:/workspace "
"--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE " "--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE "
f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" ' f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" '
f"{image}" f"{image}"
@ -69,7 +73,7 @@ if __name__ == "__main__":
build_name = get_build_name_for_check(check_name) build_name = get_build_name_for_check(check_name)
print(build_name) print(build_name)
urls = get_build_urls(build_name, reports_path) urls = read_build_urls(build_name, reports_path)
if not urls: if not urls:
raise Exception("No build URLs found") raise Exception("No build URLs found")

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import json import json
import logging import logging
import os
import sys import sys
import time import time
from typing import Optional from typing import List, Optional
import requests # type: ignore import requests # type: ignore
@ -41,11 +41,11 @@ def get_with_retries(
return response return response
def get_build_name_for_check(check_name): def get_build_name_for_check(check_name) -> str:
return CI_CONFIG["tests_config"][check_name]["required_build"] return CI_CONFIG["tests_config"][check_name]["required_build"]
def get_build_urls(build_name, reports_path): def read_build_urls(build_name, reports_path) -> List[str]:
for root, _, files in os.walk(reports_path): for root, _, files in os.walk(reports_path):
for f in files: for f in files:
if build_name in f: if build_name in f:
@ -56,7 +56,7 @@ def get_build_urls(build_name, reports_path):
return [] return []
def dowload_build_with_progress(url, path): def download_build_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path) logging.info("Downloading from %s to temp path %s", url, path)
for i in range(DOWNLOAD_RETRIES_COUNT): for i in range(DOWNLOAD_RETRIES_COUNT):
try: try:
@ -104,14 +104,14 @@ def download_builds(result_path, build_urls, filter_fn):
if filter_fn(url): if filter_fn(url):
fname = os.path.basename(url.replace("%2B", "+").replace("%20", " ")) fname = os.path.basename(url.replace("%2B", "+").replace("%20", " "))
logging.info("Will download %s to %s", fname, result_path) logging.info("Will download %s to %s", fname, result_path)
dowload_build_with_progress(url, os.path.join(result_path, fname)) download_build_with_progress(url, os.path.join(result_path, fname))
def download_builds_filter( def download_builds_filter(
check_name, reports_path, result_path, filter_fn=lambda _: True check_name, reports_path, result_path, filter_fn=lambda _: True
): ):
build_name = get_build_name_for_check(check_name) build_name = get_build_name_for_check(check_name)
urls = get_build_urls(build_name, reports_path) urls = read_build_urls(build_name, reports_path)
print(urls) print(urls)
if not urls: if not urls:

79
tests/ci/download_binary.py Executable file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env python
"""
This file is needed to avoid cicle import build_download_helper.py <=> env_helper.py
"""
import argparse
import logging
import os
from build_download_helper import download_build_with_progress
from ci_config import CI_CONFIG, BuildConfig
from env_helper import RUNNER_TEMP, S3_ARTIFACT_DOWNLOAD_TEMPLATE
from git_helper import Git, commit
from version_helper import get_version_from_repo, version_arg
TEMP_PATH = os.path.join(RUNNER_TEMP, "download_binary")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Script to download binary artifacts from S3. Downloaded artifacts "
"are renamed to clickhouse-{static_binary_name}",
)
parser.add_argument(
"--version",
type=version_arg,
default=get_version_from_repo().string,
help="a version to generate a download url, get from the repo by default",
)
parser.add_argument(
"--commit",
type=commit,
default=Git(True).sha,
help="a version to generate a download url, get from the repo by default",
)
parser.add_argument("--rename", default=True, help=argparse.SUPPRESS)
parser.add_argument(
"--no-rename",
dest="rename",
action="store_false",
default=argparse.SUPPRESS,
help="if set, the downloaded binary won't be renamed to "
"clickhouse-{static_binary_name}, makes sense only for a single build name",
)
parser.add_argument(
"build_names",
nargs="+",
help="the build names to download",
)
args = parser.parse_args()
if not args.rename and len(args.build_names) > 1:
parser.error("`--no-rename` shouldn't be used with more than one build name")
return args
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
args = parse_args()
os.makedirs(TEMP_PATH, exist_ok=True)
for build in args.build_names:
# check if it's in CI_CONFIG
config = CI_CONFIG["build_config"][build] # type: BuildConfig
if args.rename:
path = os.path.join(TEMP_PATH, f"clickhouse-{config['static_binary_name']}")
else:
path = os.path.join(TEMP_PATH, "clickhouse")
url = S3_ARTIFACT_DOWNLOAD_TEMPLATE.format(
pr_or_release=f"{args.version.major}.{args.version.minor}",
commit=args.commit,
build_name=build,
artifact="clickhouse",
)
download_build_with_progress(url, path)
if __name__ == "__main__":
main()

View File

@ -22,10 +22,14 @@ IMAGES_PATH = os.getenv("IMAGES_PATH", TEMP_PATH)
REPORTS_PATH = os.getenv("REPORTS_PATH", p.abspath(p.join(module_dir, "./reports"))) REPORTS_PATH = os.getenv("REPORTS_PATH", p.abspath(p.join(module_dir, "./reports")))
REPO_COPY = os.getenv("REPO_COPY", git_root) REPO_COPY = os.getenv("REPO_COPY", git_root)
RUNNER_TEMP = os.getenv("RUNNER_TEMP", p.abspath(p.join(module_dir, "./tmp"))) RUNNER_TEMP = os.getenv("RUNNER_TEMP", p.abspath(p.join(module_dir, "./tmp")))
S3_URL = os.getenv("S3_URL", "https://s3.amazonaws.com")
S3_DOWNLOAD = os.getenv("S3_DOWNLOAD", S3_URL)
S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds") S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds")
S3_TEST_REPORTS_BUCKET = os.getenv("S3_TEST_REPORTS_BUCKET", "clickhouse-test-reports") S3_TEST_REPORTS_BUCKET = os.getenv("S3_TEST_REPORTS_BUCKET", "clickhouse-test-reports")
S3_URL = os.getenv("S3_URL", "https://s3.amazonaws.com")
S3_DOWNLOAD = os.getenv("S3_DOWNLOAD", S3_URL)
S3_ARTIFACT_DOWNLOAD_TEMPLATE = (
f"{S3_DOWNLOAD}/{S3_BUILDS_BUCKET}/"
"{pr_or_release}/{commit}/{build_name}/{artifact}"
)
# These parameters are set only on demand, and only once # These parameters are set only on demand, and only once
_GITHUB_JOB_ID = "" _GITHUB_JOB_ID = ""

View File

@ -8,8 +8,8 @@ from collections import namedtuple
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from artifactory import ArtifactorySaaSPath # type: ignore from artifactory import ArtifactorySaaSPath # type: ignore
from build_download_helper import dowload_build_with_progress from build_download_helper import download_build_with_progress
from env_helper import RUNNER_TEMP, S3_BUILDS_BUCKET, S3_DOWNLOAD from env_helper import S3_ARTIFACT_DOWNLOAD_TEMPLATE, RUNNER_TEMP
from git_helper import TAG_REGEXP, commit, removeprefix, removesuffix from git_helper import TAG_REGEXP, commit, removeprefix, removesuffix
@ -97,18 +97,6 @@ class Packages:
class S3: class S3:
template = (
f"{S3_DOWNLOAD}/"
# "clickhouse-builds/"
f"{S3_BUILDS_BUCKET}/"
# "33333/" or "21.11/" from --release, if pull request is omitted
"{pr}/"
# "2bef313f75e4cacc6ea2ef2133e8849ecf0385ec/"
"{commit}/"
# "package_release/clickhouse-common-static_21.11.5.0_amd64.deb"
"{s3_path_suffix}"
)
def __init__( def __init__(
self, self,
pr: int, pr: int,
@ -117,7 +105,7 @@ class S3:
force_download: bool, force_download: bool,
): ):
self._common = dict( self._common = dict(
pr=pr, pr_or_release=pr,
commit=commit, commit=commit,
) )
self.force_download = force_download self.force_download = force_download
@ -133,18 +121,19 @@ class S3:
self.packages.replace_with_fallback(package_file) self.packages.replace_with_fallback(package_file)
return return
url = self.template.format_map( build_name, artifact = s3_path_suffix.split("/")
{**self._common, "s3_path_suffix": s3_path_suffix} url = S3_ARTIFACT_DOWNLOAD_TEMPLATE.format_map(
{**self._common, "build_name": build_name, "artifact": artifact}
) )
try: try:
dowload_build_with_progress(url, path) download_build_with_progress(url, path)
except Exception as e: except Exception as e:
if "Cannot download dataset from" in e.args[0]: if "Cannot download dataset from" in e.args[0]:
new_url = Packages.fallback_to_all(url) new_url = Packages.fallback_to_all(url)
logging.warning( logging.warning(
"Fallback downloading %s for old release", fallback_path "Fallback downloading %s for old release", fallback_path
) )
dowload_build_with_progress(new_url, fallback_path) download_build_with_progress(new_url, fallback_path)
self.packages.replace_with_fallback(package_file) self.packages.replace_with_fallback(package_file)
def download_deb(self): def download_deb(self):

View File

@ -33,7 +33,7 @@ def get_run_command(
"docker run --cap-add=SYS_PTRACE " "docker run --cap-add=SYS_PTRACE "
# a static link, don't use S3_URL or S3_DOWNLOAD # a static link, don't use S3_URL or S3_DOWNLOAD
"-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' " "-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' "
# For dmesg # For dmesg and sysctl
"--privileged " "--privileged "
f"--volume={build_path}:/package_folder " f"--volume={build_path}:/package_folder "
f"--volume={result_folder}:/test_output " f"--volume={result_folder}:/test_output "

View File

@ -20,7 +20,7 @@ const char * auto_contributors[] {{
VERSIONS = Dict[str, Union[int, str]] VERSIONS = Dict[str, Union[int, str]]
VERSIONS_TEMPLATE = """# This variables autochanged by release_lib.sh: VERSIONS_TEMPLATE = """# This variables autochanged by tests/ci/version_helper.py:
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION, # NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes. # only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.

View File

@ -29,7 +29,6 @@ def generate_cluster_def():
main_configs = ["configs/backups_disk.xml", generate_cluster_def()] main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
user_configs = ["configs/allow_database_types.xml"] user_configs = ["configs/allow_database_types.xml"]
nodes = [] nodes = []
@ -175,11 +174,21 @@ def test_concurrent_backups_on_different_nodes():
@pytest.mark.parametrize( @pytest.mark.parametrize(
"db_engine, table_engine", "db_engine, table_engine",
[("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")], [
("Ordinary", "MergeTree"),
("Atomic", "MergeTree"),
("Replicated", "ReplicatedMergeTree"),
("Memory", "MergeTree"),
("Lazy", "Log"),
],
) )
def test_create_or_drop_tables_during_backup(db_engine, table_engine): def test_create_or_drop_tables_during_backup(db_engine, table_engine):
if db_engine == "Replicated": if db_engine == "Replicated":
db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')" db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')"
if db_engine == "Lazy":
db_engine = "Lazy(20)"
if table_engine.endswith("MergeTree"): if table_engine.endswith("MergeTree"):
table_engine += " ORDER BY tuple()" table_engine += " ORDER BY tuple()"
@ -189,7 +198,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
start_time = time.time() start_time = time.time()
end_time = start_time + 60 end_time = start_time + 60
def create_table(): def create_tables():
while time.time() < end_time: while time.time() < end_time:
node = nodes[randint(0, num_nodes - 1)] node = nodes[randint(0, num_nodes - 1)]
table_name = f"mydb.tbl{randint(1, num_nodes)}" table_name = f"mydb.tbl{randint(1, num_nodes)}"
@ -200,13 +209,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)" f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)"
) )
def drop_table(): def drop_tables():
while time.time() < end_time: while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}" table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)] node = nodes[randint(0, num_nodes - 1)]
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY") node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
def rename_table(): def rename_tables():
while time.time() < end_time: while time.time() < end_time:
table_name1 = f"mydb.tbl{randint(1, num_nodes)}" table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
table_name2 = f"mydb.tbl{randint(1, num_nodes)}" table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
@ -215,7 +224,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
f"RENAME TABLE {table_name1} TO {table_name2}" f"RENAME TABLE {table_name1} TO {table_name2}"
) )
def make_backup(): def truncate_tables():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY")
def make_backups():
ids = [] ids = []
while time.time() < end_time: while time.time() < end_time:
time.sleep( time.sleep(
@ -231,11 +246,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
ids = [] ids = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [] futures = []
ids_future = executor.submit(make_backup) ids_future = executor.submit(make_backups)
futures.append(ids_future) futures.append(ids_future)
futures.append(executor.submit(create_table)) futures.append(executor.submit(create_tables))
futures.append(executor.submit(drop_table)) futures.append(executor.submit(drop_tables))
futures.append(executor.submit(rename_table)) futures.append(executor.submit(rename_tables))
futures.append(executor.submit(truncate_tables))
for future in futures: for future in futures:
future.result() future.result()
ids = ids_future.result() ids = ids_future.result()

View File

@ -5,13 +5,15 @@ DROP TABLE IF EXISTS restore_01640;
CREATE TABLE test_01640(i Int64, d Date, s String) CREATE TABLE test_01640(i Int64, d Date, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}') ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
PARTITION BY toYYYYMM(d) ORDER BY i; PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
insert into test_01640 values (1, '2021-01-01','some'); insert into test_01640 values (1, '2021-01-01','some');
CREATE TABLE restore_01640(i Int64, d Date, s String) CREATE TABLE restore_01640(i Int64, d Date, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/restore_01640','{replica}') ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/restore_01640','{replica}')
PARTITION BY toYYYYMM(d) ORDER BY i; PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
ALTER TABLE restore_01640 FETCH PARTITION tuple(toYYYYMM(toDate('2021-01-01'))) ALTER TABLE restore_01640 FETCH PARTITION tuple(toYYYYMM(toDate('2021-01-01')))
FROM '/clickhouse/{database}/{shard}/tables/test_01640'; FROM '/clickhouse/{database}/{shard}/tables/test_01640';

View File

@ -0,0 +1,27 @@
Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), id Int8)
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}}
[['bbb','']] [['aaa','ccc']]
[['ddd','']] [['','']]
1
[[0,0]]
[[10,20]]
Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8)
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}}
[['aaa']] [[[]]]
[['bbb','ccc']] [[[10],[20]]]
1
[[[]]]
[[[10],[20]]]
Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8)
{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}}
['qqq','www'] [[],[]] [[],[]]
['eee'] [['aaa']] [[[]]]
['rrr'] [['bbb','ccc']] [[[10],[20]]]
1
[[],[]]
[[[]]]
[[[10],[20]]]

View File

@ -0,0 +1,48 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_17;
SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
CREATE TABLE t_json_17(obj JSON)
ENGINE = MergeTree ORDER BY tuple();
DROP FUNCTION IF EXISTS hasValidSizes17;
CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2);
SYSTEM STOP MERGES t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17;
SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
DROP FUNCTION hasValidSizes17;
DROP TABLE t_json_17;