mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into lightweight-delete
This commit is contained in:
commit
34c2c4bb52
135
.github/workflows/backport_branches.yml
vendored
135
.github/workflows/backport_branches.yml
vendored
@ -349,6 +349,100 @@ jobs:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinDarwin:
|
||||
needs: [DockerHubPush]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_darwin
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0 # otherwise we will have no info about contributors
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinDarwinAarch64:
|
||||
needs: [DockerHubPush]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_darwin_aarch64
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0 # otherwise we will have no info about contributors
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
############################################################################################
|
||||
##################################### Docker images #######################################
|
||||
############################################################################################
|
||||
@ -425,6 +519,46 @@ jobs:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
BuilderSpecialReport:
|
||||
needs:
|
||||
- BuilderBinDarwin
|
||||
- BuilderBinDarwinAarch64
|
||||
runs-on: [self-hosted, style-checker]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/report_check
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=ClickHouse special build check
|
||||
NEEDS_DATA_PATH=${{runner.temp}}/needs.json
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Report Builder
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cat > "$NEEDS_DATA_PATH" << 'EOF'
|
||||
${{ toJSON(needs) }}
|
||||
EOF
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 build_report_check.py "$CHECK_NAME"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
##############################################################################################
|
||||
########################### FUNCTIONAl STATELESS TESTS #######################################
|
||||
##############################################################################################
|
||||
@ -592,6 +726,7 @@ jobs:
|
||||
- DockerHubPush
|
||||
- DockerServerImages
|
||||
- BuilderReport
|
||||
- BuilderSpecialReport
|
||||
- FunctionalStatelessTestAsan
|
||||
- FunctionalStatefulTestDebug
|
||||
- StressTestTsan
|
||||
|
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@ -29,8 +29,12 @@ jobs:
|
||||
rm -rf "$TEMP_PATH" && mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY"
|
||||
# Download and push packages to artifactory
|
||||
python3 ./tests/ci/push_to_artifactory.py --release "${{ github.ref }}" \
|
||||
--commit '${{ github.sha }}' --artifactory-url "${{ secrets.JFROG_ARTIFACTORY_URL }}" --all
|
||||
# Download macos binaries to ${{runner.temp}}/download_binary
|
||||
python3 ./tests/ci/download_binary.py binary_darwin binary_darwin_aarch64
|
||||
mv '${{runner.temp}}/download_binary/'clickhouse-* '${{runner.temp}}/push_to_artifactory'
|
||||
- name: Upload packages to release assets
|
||||
uses: svenstaro/upload-release-action@v2
|
||||
with:
|
||||
|
135
.github/workflows/release_branches.yml
vendored
135
.github/workflows/release_branches.yml
vendored
@ -426,6 +426,100 @@ jobs:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinDarwin:
|
||||
needs: [DockerHubPush]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_darwin
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0 # otherwise we will have no info about contributors
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinDarwinAarch64:
|
||||
needs: [DockerHubPush]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_darwin_aarch64
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0 # otherwise we will have no info about contributors
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
############################################################################################
|
||||
##################################### Docker images #######################################
|
||||
############################################################################################
|
||||
@ -505,6 +599,46 @@ jobs:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
BuilderSpecialReport:
|
||||
needs:
|
||||
- BuilderBinDarwin
|
||||
- BuilderBinDarwinAarch64
|
||||
runs-on: [self-hosted, style-checker]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/report_check
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=ClickHouse special build check
|
||||
NEEDS_DATA_PATH=${{runner.temp}}/needs.json
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Report Builder
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cat > "$NEEDS_DATA_PATH" << 'EOF'
|
||||
${{ toJSON(needs) }}
|
||||
EOF
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 build_report_check.py "$CHECK_NAME"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
# shellcheck disable=SC2046
|
||||
docker kill $(docker ps -q) ||:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
##############################################################################################
|
||||
########################### FUNCTIONAl STATELESS TESTS #######################################
|
||||
##############################################################################################
|
||||
@ -1847,6 +1981,7 @@ jobs:
|
||||
- DockerHubPush
|
||||
- DockerServerImages
|
||||
- BuilderReport
|
||||
- BuilderSpecialReport
|
||||
- FunctionalStatelessTestDebug0
|
||||
- FunctionalStatelessTestDebug1
|
||||
- FunctionalStatelessTestDebug2
|
||||
|
@ -1,8 +1,15 @@
|
||||
#!/bin/bash
|
||||
# shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031
|
||||
|
||||
set -eux
|
||||
set -x
|
||||
|
||||
# core.COMM.PID-TID
|
||||
sysctl kernel.core_pattern='core.%e.%p-%P'
|
||||
|
||||
set -e
|
||||
set -u
|
||||
set -o pipefail
|
||||
|
||||
trap "exit" INT TERM
|
||||
# The watchdog is in the separate process group, so we have to kill it separately
|
||||
# if the script terminates earlier.
|
||||
@ -87,6 +94,19 @@ function configure
|
||||
# TODO figure out which ones are needed
|
||||
cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d
|
||||
cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d
|
||||
|
||||
cat > db/config.d/core.xml <<EOL
|
||||
<clickhouse>
|
||||
<core_dump>
|
||||
<!-- 100GiB -->
|
||||
<size_limit>107374182400</size_limit>
|
||||
</core_dump>
|
||||
<!-- NOTE: no need to configure core_path,
|
||||
since clickhouse is not started as daemon (via clickhouse start)
|
||||
-->
|
||||
<core_path>$PWD</core_path>
|
||||
</clickhouse>
|
||||
EOL
|
||||
}
|
||||
|
||||
function watchdog
|
||||
@ -180,7 +200,6 @@ handle SIGUSR2 nostop noprint pass
|
||||
handle SIG$RTMIN nostop noprint pass
|
||||
info signals
|
||||
continue
|
||||
gcore
|
||||
backtrace full
|
||||
thread apply all backtrace full
|
||||
info registers
|
||||
|
@ -8,6 +8,9 @@ dmesg --clear
|
||||
|
||||
set -x
|
||||
|
||||
# core.COMM.PID-TID
|
||||
sysctl kernel.core_pattern='core.%e.%p-%P'
|
||||
|
||||
# Thread Fuzzer allows to check more permutations of possible thread scheduling
|
||||
# and find more potential issues.
|
||||
|
||||
@ -104,6 +107,19 @@ EOL
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
||||
EOL
|
||||
|
||||
cat > /etc/clickhouse-server/config.d/core.xml <<EOL
|
||||
<clickhouse>
|
||||
<core_dump>
|
||||
<!-- 100GiB -->
|
||||
<size_limit>107374182400</size_limit>
|
||||
</core_dump>
|
||||
<!-- NOTE: no need to configure core_path,
|
||||
since clickhouse is not started as daemon (via clickhouse start)
|
||||
-->
|
||||
<core_path>$PWD</core_path>
|
||||
</clickhouse>
|
||||
EOL
|
||||
}
|
||||
|
||||
@ -160,7 +176,6 @@ handle SIGUSR2 nostop noprint pass
|
||||
handle SIG$RTMIN nostop noprint pass
|
||||
info signals
|
||||
continue
|
||||
gcore
|
||||
backtrace full
|
||||
thread apply all backtrace full
|
||||
info registers
|
||||
@ -504,8 +519,7 @@ done
|
||||
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv
|
||||
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
|
||||
|
||||
# Core dumps (see gcore)
|
||||
# Default filename is 'core.PROCESS_ID'
|
||||
# Core dumps
|
||||
for core in core.*; do
|
||||
pigz $core
|
||||
mv $core.gz /test_output/
|
||||
|
@ -140,6 +140,6 @@ hash cmake
|
||||
|
||||
ClickHouse is available in pre-built binaries and packages. Binaries are portable and can be run on any Linux flavour.
|
||||
|
||||
They are built for stable, prestable and testing releases as long as for every commit to master and for every pull request.
|
||||
Binaries are built for stable and LTS releases and also every commit to `master` for each pull request.
|
||||
|
||||
To find the freshest build from `master`, go to [commits page](https://github.com/ClickHouse/ClickHouse/commits/master), click on the first green check mark or red cross near commit, and click to the “Details” link right after “ClickHouse Build Check”.
|
||||
|
@ -4,10 +4,9 @@ sidebar_position: 1
|
||||
keywords: [clickhouse, install, installation, docs]
|
||||
description: ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture.
|
||||
slug: /en/getting-started/install
|
||||
title: Installation
|
||||
---
|
||||
|
||||
# Installation
|
||||
|
||||
## System Requirements {#system-requirements}
|
||||
|
||||
ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86_64, AArch64, or PowerPC64LE CPU architecture.
|
||||
|
@ -20,6 +20,7 @@ Additional cache types:
|
||||
- [Avro format](../interfaces/formats.md#data-format-avro) schemas cache.
|
||||
- [Dictionaries](../sql-reference/dictionaries/index.md) data cache.
|
||||
- Schema inference cache.
|
||||
- [Filesystem cache](storing-data.md) over S3, Azure, Local and other disks.
|
||||
|
||||
Indirectly used:
|
||||
|
||||
|
@ -112,6 +112,119 @@ Example of disk configuration:
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
## Using local cache {#using-local-cache}
|
||||
|
||||
It is possible to configure local cache over disks in storage configuration starting from version 22.3. For versions 22.3 - 22.7 cache is supported only for `s3` disk type. For versions >= 22.8 cache is supported for any disk type: S3, Azure, Local, Encrypted, etc. Cache uses `LRU` cache policy.
|
||||
|
||||
Example of configuration for versions later or equal to 22.8:
|
||||
|
||||
``` xml
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>...</endpoint>
|
||||
... s3 configuration ...
|
||||
</s3>
|
||||
<cache>
|
||||
<type>cache</type>
|
||||
<disk>s3</disk>
|
||||
<path>/s3_cache/</path>
|
||||
<max_size>10000000</max_size>
|
||||
</cache>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
```
|
||||
|
||||
Example of configuration for versions earlier than 22.8:
|
||||
|
||||
``` xml
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>...</endpoint>
|
||||
... s3 configuration ...
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<data_cache_size>10000000</data_cache_size>
|
||||
</s3>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
```
|
||||
|
||||
Cache **configuration settings**:
|
||||
|
||||
- `path` - path to the directory with cache. Default: None, this setting is obligatory.
|
||||
|
||||
- `max_size` - maximum size of the cache in bytes. When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
|
||||
|
||||
- `cache_on_write_operations` - allow to turn on `write-through` cache (caching data on any write operations: `INSERT` queries, background merges). Default: `false`. The `write-through` cache can be disabled per query using setting `enable_filesystem_cache_on_write_operations` (data is cached only if both cache config settings and corresponding query setting are enabled).
|
||||
|
||||
- `enable_filesystem_query_cache_limit` - allow to limit the size of cache which is downloaded within each query (depends on user setting `max_query_cache_size`). Default: `false`.
|
||||
|
||||
- `enable_cache_hits_threshold` - a number, which defines how many times some data needs to be read before it will be cached. Default: `0`, e.g. the data is cached at the first attempt to read it.
|
||||
|
||||
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `true`.
|
||||
|
||||
- `max_file_segment_size` - a maximum size of a single cache file. Default: `104857600` (100 Mb).
|
||||
|
||||
- `max_elements` - a limit for a number of cache files. Default: `1048576`.
|
||||
|
||||
Cache **query settings**:
|
||||
|
||||
- `enable_filesystem_cache` - allows to disable cache per query even if storage policy was configured with `cache` disk type. Default: `true`.
|
||||
|
||||
- `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` - allows to use cache in query only if it already exists, otherwise query data will not be written to local cache storage. Default: `false`.
|
||||
|
||||
- `enable_filesystem_cache_on_write_operations` - turn on `write-through` cache. This setting works only if setting `cache_on_write_operations` in cache configuration is turned on.
|
||||
|
||||
- `enable_filesystem_cache_log` - turn on logging to `system.filesystem_cache_log` table. Gives a detailed view of cache usage per query. Default: `false`.
|
||||
|
||||
- `max_query_cache_size` - a limit for the cache size, which can be written to local cache storage. Requires enabled `enable_filesystem_query_cache_limit` in cache configuration. Default: `false`.
|
||||
|
||||
- `skip_download_if_exceeds_query_cache` - allows to change the behaviour of setting `max_query_cache_size`. Default: `true`. If this setting is turned on and cache download limit during query was reached, no more cache will be downloaded to cache storage. If this setting is turned off and cache download limit during query was reached, cache will still be written by cost of evicting previously downloaded (within current query) data, e.g. second behaviour allows to preserve `last recentltly used` behaviour while keeping query cache limit.
|
||||
|
||||
** Warning **
|
||||
Cache configuration settings and cache query settings correspond to the latest ClickHouse version, for earlier versions something might not be supported.
|
||||
|
||||
Cache **system tables**:
|
||||
|
||||
- `system.filesystem_cache` - system tables which shows current state of cache.
|
||||
|
||||
- `system.filesystem_cache_log` - system table which shows detailed cache usage per query. Requires `enable_filesystem_cache_log` setting to be `true`.
|
||||
|
||||
Cache **commands**:
|
||||
|
||||
- `SYSTEM DROP FILESYSTEM CACHE (<path>) (ON CLUSTER)`
|
||||
|
||||
- `SHOW CACHES` -- show list of caches which were configured on the server.
|
||||
|
||||
- `DESCRIBE CACHE '<cache_name>'` - show cache configuration and some general statistics for a specific cache. Cache name can be taken from `SHOW CACHES` command.
|
||||
|
||||
Cache current metrics:
|
||||
|
||||
- `FilesystemCacheSize`
|
||||
|
||||
- `FilesystemCacheElements`
|
||||
|
||||
Cache asynchronous metrics:
|
||||
|
||||
- `FilesystemCacheBytes`
|
||||
|
||||
- `FilesystemCacheFiles`
|
||||
|
||||
Cache profile events:
|
||||
|
||||
- `CachedReadBufferReadFromSourceBytes`, `CachedReadBufferReadFromCacheBytes,`
|
||||
|
||||
- `CachedReadBufferReadFromSourceMicroseconds`, `CachedReadBufferReadFromCacheMicroseconds`
|
||||
|
||||
- `CachedReadBufferCacheWriteBytes`, `CachedReadBufferCacheWriteMicroseconds`
|
||||
|
||||
- `CachedWriteBufferCacheWriteBytes`, `CachedWriteBufferCacheWriteMicroseconds`
|
||||
|
||||
## Storing Data on Web Server {#storing-data-on-webserver}
|
||||
|
||||
There is a tool `clickhouse-static-files-uploader`, which prepares a data directory for a given table (`SELECT data_paths FROM system.tables WHERE name = 'table_name'`). For each table you need, you get a directory of files. These files can be uploaded to, for example, a web server with static files. After this preparation, you can load this table into any ClickHouse server via `DiskWeb`.
|
||||
|
@ -641,7 +641,7 @@ Result:
|
||||
## date\_diff
|
||||
|
||||
Returns the difference between two dates or dates with time values.
|
||||
The difference is calculated using relative units, e.g. the difference between `2022-01-01` and `2021-12-29` is 3 days for day unit (see [toRelativeDayNum](#toRelativeDayNum)), 1 month for month unit (see [toRelativeMonthNum](#toRelativeMonthNum)), 1 year for year unit (see [toRelativeYearNum](#toRelativeYearNum)).
|
||||
The difference is calculated using relative units, e.g. the difference between `2022-01-01` and `2021-12-29` is 3 days for day unit (see [toRelativeDayNum](#torelativedaynum)), 1 month for month unit (see [toRelativeMonthNum](#torelativemonthnum)), 1 year for year unit (see [toRelativeYearNum](#torelativeyearnum)).
|
||||
|
||||
**Syntax**
|
||||
|
||||
|
@ -10,7 +10,7 @@ Makes the server "forget" about the existence of a table, a materialized view, o
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY]
|
||||
DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
|
||||
```
|
||||
|
||||
Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall.
|
||||
@ -24,6 +24,8 @@ Note that you can not detach permanently the table which is already detached (te
|
||||
|
||||
Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query.
|
||||
|
||||
The `SYNC` modifier executes the action without delay.
|
||||
|
||||
**Example**
|
||||
|
||||
Creating a table:
|
||||
|
@ -6,7 +6,7 @@ sidebar_label: DROP
|
||||
|
||||
# DROP Statements
|
||||
|
||||
Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist.
|
||||
Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. If the `SYNC` modifier is specified, the entity is dropped without delay.
|
||||
|
||||
## DROP DATABASE
|
||||
|
||||
@ -15,7 +15,7 @@ Deletes all tables inside the `db` database, then deletes the `db` database itse
|
||||
Syntax:
|
||||
|
||||
``` sql
|
||||
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster]
|
||||
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC]
|
||||
```
|
||||
|
||||
## DROP TABLE
|
||||
@ -25,7 +25,7 @@ Deletes the table.
|
||||
Syntax:
|
||||
|
||||
``` sql
|
||||
DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster]
|
||||
DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
|
||||
```
|
||||
|
||||
## DROP DICTIONARY
|
||||
@ -35,7 +35,7 @@ Deletes the dictionary.
|
||||
Syntax:
|
||||
|
||||
``` sql
|
||||
DROP DICTIONARY [IF EXISTS] [db.]name
|
||||
DROP DICTIONARY [IF EXISTS] [db.]name [SYNC]
|
||||
```
|
||||
|
||||
## DROP USER
|
||||
@ -95,7 +95,7 @@ Deletes a view. Views can be deleted by a `DROP TABLE` command as well but `DROP
|
||||
Syntax:
|
||||
|
||||
``` sql
|
||||
DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster]
|
||||
DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
|
||||
```
|
||||
|
||||
## DROP FUNCTION
|
||||
|
37
src/Backups/BackupEntryWrappedWith.h
Normal file
37
src/Backups/BackupEntryWrappedWith.h
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include <Backups/IBackupEntry.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Wraps another backup entry and a value of any type.
|
||||
template <typename T>
|
||||
class BackupEntryWrappedWith : public IBackupEntry
|
||||
{
|
||||
public:
|
||||
BackupEntryWrappedWith(BackupEntryPtr entry_, const T & custom_value_) : entry(entry_), custom_value(custom_value_) { }
|
||||
BackupEntryWrappedWith(BackupEntryPtr entry_, T && custom_value_) : entry(entry_), custom_value(std::move(custom_value_)) { }
|
||||
~BackupEntryWrappedWith() override = default;
|
||||
|
||||
UInt64 getSize() const override { return entry->getSize(); }
|
||||
std::optional<UInt128> getChecksum() const override { return entry->getChecksum(); }
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return entry->getReadBuffer(); }
|
||||
String getFilePath() const override { return entry->getFilePath(); }
|
||||
DiskPtr tryGetDiskIfExists() const override { return entry->tryGetDiskIfExists(); }
|
||||
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
|
||||
|
||||
private:
|
||||
BackupEntryPtr entry;
|
||||
T custom_value;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
void wrapBackupEntriesWith(std::vector<std::pair<String, BackupEntryPtr>> & backup_entries, const T & custom_value)
|
||||
{
|
||||
for (auto & [_, backup_entry] : backup_entries)
|
||||
backup_entry = std::make_shared<BackupEntryWrappedWith<T>>(std::move(backup_entry), custom_value);
|
||||
}
|
||||
|
||||
}
|
@ -50,7 +50,7 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr &&
|
||||
if (!offsets_concrete)
|
||||
throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!offsets_concrete->empty() && data)
|
||||
if (!offsets_concrete->empty() && data && !data->empty())
|
||||
{
|
||||
Offset last_offset = offsets_concrete->getData().back();
|
||||
|
||||
|
@ -898,4 +898,25 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
|
||||
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
|
||||
}
|
||||
|
||||
PathMatchResult matchPath(std::string_view path, std::string_view match_to)
|
||||
{
|
||||
using enum PathMatchResult;
|
||||
|
||||
if (path.ends_with('/'))
|
||||
path.remove_suffix(1);
|
||||
|
||||
if (match_to.ends_with('/'))
|
||||
match_to.remove_suffix(1);
|
||||
|
||||
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), match_to.begin(), match_to.end());
|
||||
|
||||
if (second_it != match_to.end())
|
||||
return NOT_MATCH;
|
||||
|
||||
if (first_it == path.end())
|
||||
return EXACT;
|
||||
|
||||
return *first_it == '/' ? IS_CHILD : NOT_MATCH;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -554,4 +554,13 @@ private:
|
||||
ZooKeeperRequestFactory();
|
||||
};
|
||||
|
||||
enum class PathMatchResult
|
||||
{
|
||||
NOT_MATCH,
|
||||
EXACT,
|
||||
IS_CHILD
|
||||
};
|
||||
|
||||
PathMatchResult matchPath(std::string_view path, std::string_view match_to);
|
||||
|
||||
}
|
||||
|
15
src/Common/ZooKeeper/tests/gtest_zookeeper.cpp
Normal file
15
src/Common/ZooKeeper/tests/gtest_zookeeper.cpp
Normal file
@ -0,0 +1,15 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
|
||||
TEST(ZooKeeperTest, TestMatchPath)
|
||||
{
|
||||
using namespace Coordination;
|
||||
|
||||
ASSERT_EQ(matchPath("/path/file", "/path"), PathMatchResult::IS_CHILD);
|
||||
ASSERT_EQ(matchPath("/path/file", "/path/"), PathMatchResult::IS_CHILD);
|
||||
ASSERT_EQ(matchPath("/path/file", "/"), PathMatchResult::IS_CHILD);
|
||||
ASSERT_EQ(matchPath("/", "/"), PathMatchResult::EXACT);
|
||||
ASSERT_EQ(matchPath("/path", "/path/"), PathMatchResult::EXACT);
|
||||
ASSERT_EQ(matchPath("/path/", "/path"), PathMatchResult::EXACT);
|
||||
}
|
@ -116,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
|
||||
}
|
||||
};
|
||||
|
||||
ISerialization::SubstreamPath path;
|
||||
column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type);
|
||||
auto serialization = column_type->getDefaultSerialization();
|
||||
serialization->enumerateStreams(callback, column_type);
|
||||
|
||||
if (!result_codec)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());
|
||||
|
@ -13,8 +13,10 @@
|
||||
#include <filesystem>
|
||||
#include <memory>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "Coordination/KeeperContext.h"
|
||||
#include <Coordination/KeeperContext.h>
|
||||
#include <Coordination/KeeperConstants.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -146,33 +148,6 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum class PathMatchResult
|
||||
{
|
||||
NOT_MATCH,
|
||||
EXACT,
|
||||
IS_CHILD
|
||||
};
|
||||
|
||||
PathMatchResult matchPath(const std::string_view path, const std::string_view match_to)
|
||||
{
|
||||
using enum PathMatchResult;
|
||||
|
||||
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), match_to.begin(), match_to.end());
|
||||
|
||||
if (second_it != match_to.end())
|
||||
return NOT_MATCH;
|
||||
|
||||
if (first_it == path.end())
|
||||
return EXACT;
|
||||
|
||||
return *first_it == '/' ? IS_CHILD : NOT_MATCH;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context)
|
||||
{
|
||||
writeBinary(static_cast<uint8_t>(snapshot.version), out);
|
||||
@ -217,7 +192,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
|
||||
const auto & path = it->key;
|
||||
|
||||
// write only the root system path because of digest
|
||||
if (matchPath(path.toView(), keeper_system_path) == PathMatchResult::IS_CHILD)
|
||||
if (Coordination::matchPath(path.toView(), keeper_system_path) == Coordination::PathMatchResult::IS_CHILD)
|
||||
{
|
||||
++it;
|
||||
continue;
|
||||
@ -365,8 +340,8 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
KeeperStorage::Node node{};
|
||||
readNode(node, in, current_version, storage.acl_map);
|
||||
|
||||
using enum PathMatchResult;
|
||||
auto match_result = matchPath(path, keeper_system_path);
|
||||
using enum Coordination::PathMatchResult;
|
||||
auto match_result = Coordination::matchPath(path, keeper_system_path);
|
||||
|
||||
const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
|
||||
if (match_result == IS_CHILD)
|
||||
|
@ -879,7 +879,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
|
||||
path_created += seq_num_str.str();
|
||||
}
|
||||
|
||||
if (path_created.starts_with(keeper_system_path))
|
||||
if (Coordination::matchPath(path_created, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
|
||||
{
|
||||
auto error_msg = fmt::format("Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created);
|
||||
|
||||
@ -1049,7 +1049,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
|
||||
|
||||
std::vector<KeeperStorage::Delta> new_deltas;
|
||||
|
||||
if (request.path.starts_with(keeper_system_path))
|
||||
if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
|
||||
{
|
||||
auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
|
||||
|
||||
@ -1203,7 +1203,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
|
||||
|
||||
std::vector<KeeperStorage::Delta> new_deltas;
|
||||
|
||||
if (request.path.starts_with(keeper_system_path))
|
||||
if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
|
||||
{
|
||||
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);
|
||||
|
||||
@ -1472,7 +1472,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
|
||||
{
|
||||
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
|
||||
|
||||
if (request.path.starts_with(keeper_system_path))
|
||||
if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
|
||||
{
|
||||
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);
|
||||
|
||||
|
@ -2141,6 +2141,38 @@ TEST_P(CoordinationTest, TestCurrentApiVersion)
|
||||
EXPECT_EQ(keeper_version, static_cast<uint8_t>(current_keeper_api_version));
|
||||
}
|
||||
|
||||
TEST_P(CoordinationTest, TestSystemNodeModify)
|
||||
{
|
||||
using namespace Coordination;
|
||||
int64_t zxid{0};
|
||||
|
||||
// On INIT we abort when a system path is modified
|
||||
keeper_context->server_state = KeeperContext::Phase::RUNNING;
|
||||
KeeperStorage storage{500, "", keeper_context};
|
||||
const auto assert_create = [&](const std::string_view path, const auto expected_code)
|
||||
{
|
||||
auto request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
request->path = path;
|
||||
storage.preprocessRequest(request, 0, 0, zxid);
|
||||
auto responses = storage.processRequest(request, 0, zxid);
|
||||
ASSERT_FALSE(responses.empty());
|
||||
|
||||
const auto & response = responses[0];
|
||||
ASSERT_EQ(response.response->error, expected_code) << "Unexpected error for path " << path;
|
||||
|
||||
++zxid;
|
||||
};
|
||||
|
||||
assert_create("/keeper", Error::ZBADARGUMENTS);
|
||||
assert_create("/keeper/with_child", Error::ZBADARGUMENTS);
|
||||
assert_create(DB::keeper_api_version_path, Error::ZBADARGUMENTS);
|
||||
|
||||
assert_create("/keeper_map", Error::ZOK);
|
||||
assert_create("/keeper1", Error::ZOK);
|
||||
assert_create("/keepe", Error::ZOK);
|
||||
assert_create("/keeper1/test", Error::ZOK);
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
|
||||
CoordinationTest,
|
||||
::testing::ValuesIn(std::initializer_list<CompressionParam>{
|
||||
|
@ -84,18 +84,20 @@ void IDataType::forEachSubcolumn(
|
||||
{
|
||||
for (size_t i = 0; i < subpath.size(); ++i)
|
||||
{
|
||||
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1))
|
||||
size_t prefix_len = i + 1;
|
||||
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, prefix_len))
|
||||
{
|
||||
auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1);
|
||||
auto subdata = ISerialization::createFromPath(subpath, i);
|
||||
auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len);
|
||||
auto subdata = ISerialization::createFromPath(subpath, prefix_len);
|
||||
callback(subpath, name, subdata);
|
||||
}
|
||||
subpath[i].visited = true;
|
||||
}
|
||||
};
|
||||
|
||||
SubstreamPath path;
|
||||
data.serialization->enumerateStreams(path, callback_with_data, data);
|
||||
ISerialization::EnumerateStreamsSettings settings;
|
||||
settings.position_independent_encoding = false;
|
||||
data.serialization->enumerateStreams(settings, callback_with_data, data);
|
||||
}
|
||||
|
||||
template <typename Ptr>
|
||||
@ -118,33 +120,38 @@ Ptr IDataType::getForSubcolumn(
|
||||
return res;
|
||||
}
|
||||
|
||||
bool IDataType::hasSubcolumn(const String & subcolumn_name) const
|
||||
{
|
||||
return tryGetSubcolumnType(subcolumn_name) != nullptr;
|
||||
}
|
||||
|
||||
DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const
|
||||
{
|
||||
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr };
|
||||
auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
|
||||
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, false);
|
||||
}
|
||||
|
||||
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
|
||||
{
|
||||
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr };
|
||||
auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
|
||||
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, true);
|
||||
}
|
||||
|
||||
ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
|
||||
{
|
||||
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr };
|
||||
auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
|
||||
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, false);
|
||||
}
|
||||
|
||||
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
|
||||
{
|
||||
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr };
|
||||
auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
|
||||
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, true);
|
||||
}
|
||||
|
||||
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const
|
||||
{
|
||||
SubstreamData data = { serialization, nullptr, nullptr, nullptr };
|
||||
auto data = SubstreamData(serialization);
|
||||
return getForSubcolumn<SerializationPtr>(subcolumn_name, data, &SubstreamData::serialization, true);
|
||||
}
|
||||
|
||||
@ -154,7 +161,7 @@ Names IDataType::getSubcolumnNames() const
|
||||
forEachSubcolumn([&](const auto &, const auto & name, const auto &)
|
||||
{
|
||||
res.push_back(name);
|
||||
}, { getDefaultSerialization(), nullptr, nullptr, nullptr });
|
||||
}, SubstreamData(getDefaultSerialization()));
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -79,6 +79,8 @@ public:
|
||||
/// Data type id. It's used for runtime type checks.
|
||||
virtual TypeIndex getTypeId() const = 0;
|
||||
|
||||
bool hasSubcolumn(const String & subcolumn_name) const;
|
||||
|
||||
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
|
||||
DataTypePtr getSubcolumnType(const String & subcolumn_name) const;
|
||||
|
||||
|
@ -73,24 +73,24 @@ String ISerialization::SubstreamPath::toString() const
|
||||
}
|
||||
|
||||
void ISerialization::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
path.push_back(Substream::Regular);
|
||||
path.back().data = data;
|
||||
callback(path);
|
||||
path.pop_back();
|
||||
settings.path.push_back(Substream::Regular);
|
||||
settings.path.back().data = data;
|
||||
callback(settings.path);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
|
||||
void ISerialization::enumerateStreams(
|
||||
const StreamCallback & callback,
|
||||
const DataTypePtr & type,
|
||||
const ColumnPtr & column) const
|
||||
{
|
||||
enumerateStreams(path, callback, {getPtr(), nullptr, nullptr, nullptr});
|
||||
}
|
||||
|
||||
void ISerialization::enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const
|
||||
{
|
||||
enumerateStreams(path, callback, {getPtr(), type, nullptr, nullptr});
|
||||
EnumerateStreamsSettings settings;
|
||||
auto data = SubstreamData(getPtr()).withType(type).withColumn(column);
|
||||
enumerateStreams(settings, callback, data);
|
||||
}
|
||||
|
||||
void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const
|
||||
@ -184,7 +184,7 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons
|
||||
return getFileNameForStream(column.getNameInStorage(), path);
|
||||
}
|
||||
|
||||
static size_t isOffsetsOfNested(const ISerialization::SubstreamPath & path)
|
||||
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path)
|
||||
{
|
||||
if (path.empty())
|
||||
return false;
|
||||
@ -287,10 +287,13 @@ bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t pref
|
||||
|
||||
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
|
||||
{
|
||||
assert(prefix_len < path.size());
|
||||
assert(prefix_len <= path.size());
|
||||
if (prefix_len == 0)
|
||||
return {};
|
||||
|
||||
SubstreamData res = path[prefix_len].data;
|
||||
for (ssize_t i = static_cast<ssize_t>(prefix_len) - 1; i >= 0; --i)
|
||||
ssize_t last_elem = prefix_len - 1;
|
||||
auto res = path[last_elem].data;
|
||||
for (ssize_t i = last_elem - 1; i >= 0; --i)
|
||||
{
|
||||
const auto & creator = path[i].creator;
|
||||
if (creator)
|
||||
|
@ -101,6 +101,30 @@ public:
|
||||
|
||||
struct SubstreamData
|
||||
{
|
||||
SubstreamData() = default;
|
||||
SubstreamData(SerializationPtr serialization_)
|
||||
: serialization(std::move(serialization_))
|
||||
{
|
||||
}
|
||||
|
||||
SubstreamData & withType(DataTypePtr type_)
|
||||
{
|
||||
type = std::move(type_);
|
||||
return *this;
|
||||
}
|
||||
|
||||
SubstreamData & withColumn(ColumnPtr column_)
|
||||
{
|
||||
column = std::move(column_);
|
||||
return *this;
|
||||
}
|
||||
|
||||
SubstreamData & withSerializationInfo(SerializationInfoPtr serialization_info_)
|
||||
{
|
||||
serialization_info = std::move(serialization_info_);
|
||||
return *this;
|
||||
}
|
||||
|
||||
SerializationPtr serialization;
|
||||
DataTypePtr type;
|
||||
ColumnPtr column;
|
||||
@ -164,16 +188,22 @@ public:
|
||||
|
||||
using StreamCallback = std::function<void(const SubstreamPath &)>;
|
||||
|
||||
struct EnumerateStreamsSettings
|
||||
{
|
||||
SubstreamPath path;
|
||||
bool position_independent_encoding = true;
|
||||
};
|
||||
|
||||
virtual void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const;
|
||||
|
||||
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
|
||||
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
|
||||
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
|
||||
|
||||
void enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const;
|
||||
/// Enumerate streams with default settings.
|
||||
void enumerateStreams(
|
||||
const StreamCallback & callback,
|
||||
const DataTypePtr & type = nullptr,
|
||||
const ColumnPtr & column = nullptr) const;
|
||||
|
||||
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
|
||||
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
|
||||
@ -375,4 +405,6 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const
|
||||
return state_concrete;
|
||||
}
|
||||
|
||||
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path);
|
||||
|
||||
}
|
||||
|
@ -155,30 +155,30 @@ namespace
|
||||
|
||||
return column_offsets;
|
||||
}
|
||||
}
|
||||
|
||||
ColumnPtr arrayOffsetsToSizes(const IColumn & column)
|
||||
{
|
||||
const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
|
||||
MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
|
||||
|
||||
if (column_offsets.empty())
|
||||
return column_sizes;
|
||||
|
||||
const auto & offsets_data = column_offsets.getData();
|
||||
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
|
||||
|
||||
sizes_data.resize(offsets_data.size());
|
||||
|
||||
IColumn::Offset prev_offset = 0;
|
||||
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
|
||||
ColumnPtr arrayOffsetsToSizes(const IColumn & column)
|
||||
{
|
||||
auto current_offset = offsets_data[i];
|
||||
sizes_data[i] = current_offset - prev_offset;
|
||||
prev_offset = current_offset;
|
||||
}
|
||||
const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
|
||||
MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
|
||||
|
||||
return column_sizes;
|
||||
if (column_offsets.empty())
|
||||
return column_sizes;
|
||||
|
||||
const auto & offsets_data = column_offsets.getData();
|
||||
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
|
||||
|
||||
sizes_data.resize(offsets_data.size());
|
||||
|
||||
IColumn::Offset prev_offset = 0;
|
||||
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
|
||||
{
|
||||
auto current_offset = offsets_data[i];
|
||||
sizes_data[i] = current_offset - prev_offset;
|
||||
prev_offset = current_offset;
|
||||
}
|
||||
|
||||
return column_sizes;
|
||||
}
|
||||
}
|
||||
|
||||
DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const
|
||||
@ -197,41 +197,42 @@ ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) c
|
||||
}
|
||||
|
||||
void SerializationArray::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
const auto * type_array = data.type ? &assert_cast<const DataTypeArray &>(*data.type) : nullptr;
|
||||
const auto * column_array = data.column ? &assert_cast<const ColumnArray &>(*data.column) : nullptr;
|
||||
auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr;
|
||||
auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr;
|
||||
|
||||
path.push_back(Substream::ArraySizes);
|
||||
path.back().data =
|
||||
{
|
||||
auto offsets_serialization =
|
||||
std::make_shared<SerializationNamed>(
|
||||
std::make_shared<SerializationNumber<UInt64>>(),
|
||||
"size" + std::to_string(getArrayLevel(path)), false),
|
||||
data.type ? std::make_shared<DataTypeUInt64>() : nullptr,
|
||||
offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
"size" + std::to_string(getArrayLevel(settings.path)), false);
|
||||
|
||||
callback(path);
|
||||
auto offsets_column = offsets && !settings.position_independent_encoding
|
||||
? arrayOffsetsToSizes(*offsets)
|
||||
: offsets;
|
||||
|
||||
path.back() = Substream::ArrayElements;
|
||||
path.back().data = data;
|
||||
path.back().creator = std::make_shared<SubcolumnCreator>(offsets_column);
|
||||
settings.path.push_back(Substream::ArraySizes);
|
||||
settings.path.back().data = SubstreamData(offsets_serialization)
|
||||
.withType(type_array ? std::make_shared<DataTypeUInt64>() : nullptr)
|
||||
.withColumn(std::move(offsets_column))
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
SubstreamData next_data =
|
||||
{
|
||||
nested,
|
||||
type_array ? type_array->getNestedType() : nullptr,
|
||||
column_array ? column_array->getDataPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
callback(settings.path);
|
||||
|
||||
nested->enumerateStreams(path, callback, next_data);
|
||||
path.pop_back();
|
||||
settings.path.back() = Substream::ArrayElements;
|
||||
settings.path.back().data = data;
|
||||
settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets);
|
||||
|
||||
auto next_data = SubstreamData(nested)
|
||||
.withType(type_array ? type_array->getNestedType() : nullptr)
|
||||
.withColumn(column_array ? column_array->getDataPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
nested->enumerateStreams(settings, callback, next_data);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
void SerializationArray::serializeBinaryBulkStatePrefix(
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
*/
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
@ -79,6 +79,4 @@ private:
|
||||
};
|
||||
};
|
||||
|
||||
ColumnPtr arrayOffsetsToSizes(const IColumn & column);
|
||||
|
||||
}
|
||||
|
@ -41,30 +41,26 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic
|
||||
}
|
||||
|
||||
void SerializationLowCardinality::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr;
|
||||
|
||||
SubstreamData dict_data =
|
||||
{
|
||||
dict_inner_serialization,
|
||||
data.type ? dictionary_type : nullptr,
|
||||
column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
settings.path.push_back(Substream::DictionaryKeys);
|
||||
auto dict_data = SubstreamData(dict_inner_serialization)
|
||||
.withType(data.type ? dictionary_type : nullptr)
|
||||
.withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
path.push_back(Substream::DictionaryKeys);
|
||||
path.back().data = dict_data;
|
||||
settings.path.back().data = dict_data;
|
||||
dict_inner_serialization->enumerateStreams(settings, callback, dict_data);
|
||||
|
||||
dict_inner_serialization->enumerateStreams(path, callback, dict_data);
|
||||
settings.path.back() = Substream::DictionaryIndexes;
|
||||
settings.path.back().data = data;
|
||||
|
||||
path.back() = Substream::DictionaryIndexes;
|
||||
path.back().data = data;
|
||||
|
||||
callback(path);
|
||||
path.pop_back();
|
||||
callback(settings.path);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
struct KeysSerializationVersion
|
||||
|
@ -18,7 +18,7 @@ public:
|
||||
explicit SerializationLowCardinality(const DataTypePtr & dictionary_type);
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -257,19 +257,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
|
||||
}
|
||||
|
||||
void SerializationMap::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
SubstreamData next_data =
|
||||
{
|
||||
nested,
|
||||
data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr,
|
||||
data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
auto next_data = SubstreamData(nested)
|
||||
.withType(data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr)
|
||||
.withColumn(data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
nested->enumerateStreams(path, callback, next_data);
|
||||
nested->enumerateStreams(settings, callback, next_data);
|
||||
}
|
||||
|
||||
void SerializationMap::serializeBinaryBulkStatePrefix(
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -4,16 +4,16 @@ namespace DB
|
||||
{
|
||||
|
||||
void SerializationNamed::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
addToPath(path);
|
||||
path.back().data = data;
|
||||
path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
|
||||
addToPath(settings.path);
|
||||
settings.path.back().data = data;
|
||||
settings.path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
|
||||
|
||||
nested_serialization->enumerateStreams(path, callback, data);
|
||||
path.pop_back();
|
||||
nested_serialization->enumerateStreams(settings, callback, data);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
void SerializationNamed::serializeBinaryBulkStatePrefix(
|
||||
|
@ -26,7 +26,7 @@ public:
|
||||
const String & getElementName() const { return name; }
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -38,38 +38,35 @@ ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev
|
||||
}
|
||||
|
||||
void SerializationNullable::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
const auto * type_nullable = data.type ? &assert_cast<const DataTypeNullable &>(*data.type) : nullptr;
|
||||
const auto * column_nullable = data.column ? &assert_cast<const ColumnNullable &>(*data.column) : nullptr;
|
||||
|
||||
path.push_back(Substream::NullMap);
|
||||
path.back().data =
|
||||
{
|
||||
std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false),
|
||||
type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr,
|
||||
column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
auto null_map_serialization = std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false);
|
||||
|
||||
callback(path);
|
||||
settings.path.push_back(Substream::NullMap);
|
||||
auto null_map_data = SubstreamData(null_map_serialization)
|
||||
.withType(type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr)
|
||||
.withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
path.back() = Substream::NullableElements;
|
||||
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column);
|
||||
path.back().data = data;
|
||||
settings.path.back().data = null_map_data;
|
||||
callback(settings.path);
|
||||
|
||||
SubstreamData next_data =
|
||||
{
|
||||
nested,
|
||||
type_nullable ? type_nullable->getNestedType() : nullptr,
|
||||
column_nullable ? column_nullable->getNestedColumnPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
settings.path.back() = Substream::NullableElements;
|
||||
settings.path.back().creator = std::make_shared<SubcolumnCreator>(null_map_data.column);
|
||||
settings.path.back().data = data;
|
||||
|
||||
nested->enumerateStreams(path, callback, next_data);
|
||||
path.pop_back();
|
||||
auto next_data = SubstreamData(nested)
|
||||
.withType(type_nullable ? type_nullable->getNestedType() : nullptr)
|
||||
.withColumn(column_nullable ? column_nullable->getNestedColumnPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
nested->enumerateStreams(settings, callback, next_data);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
void SerializationNullable::serializeBinaryBulkStatePrefix(
|
||||
|
@ -14,7 +14,7 @@ public:
|
||||
explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {}
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -148,39 +148,33 @@ ColumnPtr SerializationSparse::SubcolumnCreator::create(const ColumnPtr & prev)
|
||||
}
|
||||
|
||||
void SerializationSparse::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
const auto * column_sparse = data.column ? &assert_cast<const ColumnSparse &>(*data.column) : nullptr;
|
||||
|
||||
size_t column_size = column_sparse ? column_sparse->size() : 0;
|
||||
|
||||
path.push_back(Substream::SparseOffsets);
|
||||
path.back().data =
|
||||
{
|
||||
std::make_shared<SerializationNumber<UInt64>>(),
|
||||
data.type ? std::make_shared<DataTypeUInt64>() : nullptr,
|
||||
column_sparse ? column_sparse->getOffsetsPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
settings.path.push_back(Substream::SparseOffsets);
|
||||
auto offsets_data = SubstreamData(std::make_shared<SerializationNumber<UInt64>>())
|
||||
.withType(data.type ? std::make_shared<DataTypeUInt64>() : nullptr)
|
||||
.withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
callback(path);
|
||||
settings.path.back().data = offsets_data;
|
||||
callback(settings.path);
|
||||
|
||||
path.back() = Substream::SparseElements;
|
||||
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column, column_size);
|
||||
path.back().data = data;
|
||||
settings.path.back() = Substream::SparseElements;
|
||||
settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets_data.column, column_size);
|
||||
settings.path.back().data = data;
|
||||
|
||||
SubstreamData next_data =
|
||||
{
|
||||
nested,
|
||||
data.type,
|
||||
column_sparse ? column_sparse->getValuesPtr() : nullptr,
|
||||
data.serialization_info,
|
||||
};
|
||||
auto next_data = SubstreamData(nested)
|
||||
.withType(data.type)
|
||||
.withColumn(column_sparse ? column_sparse->getValuesPtr() : nullptr)
|
||||
.withSerializationInfo(data.serialization_info);
|
||||
|
||||
nested->enumerateStreams(path, callback, next_data);
|
||||
path.pop_back();
|
||||
nested->enumerateStreams(settings, callback, next_data);
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
void SerializationSparse::serializeBinaryBulkStatePrefix(
|
||||
|
@ -28,7 +28,7 @@ public:
|
||||
Kind getKind() const override { return Kind::SPARSE; }
|
||||
|
||||
virtual void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -283,7 +283,7 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
|
||||
}
|
||||
|
||||
void SerializationTuple::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
@ -293,15 +293,12 @@ void SerializationTuple::enumerateStreams(
|
||||
|
||||
for (size_t i = 0; i < elems.size(); ++i)
|
||||
{
|
||||
SubstreamData next_data =
|
||||
{
|
||||
elems[i],
|
||||
type_tuple ? type_tuple->getElement(i) : nullptr,
|
||||
column_tuple ? column_tuple->getColumnPtr(i) : nullptr,
|
||||
info_tuple ? info_tuple->getElementInfo(i) : nullptr,
|
||||
};
|
||||
auto next_data = SubstreamData(elems[i])
|
||||
.withType(type_tuple ? type_tuple->getElement(i) : nullptr)
|
||||
.withColumn(column_tuple ? column_tuple->getColumnPtr(i) : nullptr)
|
||||
.withSerializationInfo(info_tuple ? info_tuple->getElementInfo(i) : nullptr);
|
||||
|
||||
elems[i]->enumerateStreams(path, callback, next_data);
|
||||
elems[i]->enumerateStreams(settings, callback, next_data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ public:
|
||||
/** Each sub-column in a tuple is serialized in separate stream.
|
||||
*/
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -5,11 +5,11 @@ namespace DB
|
||||
{
|
||||
|
||||
void SerializationWrapper::enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const
|
||||
{
|
||||
nested_serialization->enumerateStreams(path, callback, data);
|
||||
nested_serialization->enumerateStreams(settings, callback, data);
|
||||
}
|
||||
|
||||
void SerializationWrapper::serializeBinaryBulkStatePrefix(
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
Kind getKind() const override { return nested_serialization->getKind(); }
|
||||
|
||||
void enumerateStreams(
|
||||
SubstreamPath & path,
|
||||
EnumerateStreamsSettings & settings,
|
||||
const StreamCallback & callback,
|
||||
const SubstreamData & data) const override;
|
||||
|
||||
|
@ -31,7 +31,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
|
||||
enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD);
|
||||
|
||||
do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", true);
|
||||
do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", false);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -58,6 +58,8 @@ public:
|
||||
/// Add limits from external query.
|
||||
void addStorageLimits(const StorageLimitsList & limits);
|
||||
|
||||
ContextPtr getContext() const { return context; }
|
||||
|
||||
protected:
|
||||
ASTPtr query_ptr;
|
||||
ContextMutablePtr context;
|
||||
|
@ -163,7 +163,7 @@ BlockIO InterpreterDescribeQuery::execute()
|
||||
res_columns[6]->insertDefault();
|
||||
|
||||
res_columns[7]->insert(1u);
|
||||
}, { type->getDefaultSerialization(), type, nullptr, nullptr });
|
||||
}, ISerialization::SubstreamData(type->getDefaultSerialization()).withType(type));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -316,7 +316,7 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
|
||||
interpreter.buildQueryPlan(plan);
|
||||
|
||||
if (settings.optimize)
|
||||
plan.optimize(QueryPlanOptimizationSettings::fromContext(getContext()));
|
||||
plan.optimize(QueryPlanOptimizationSettings::fromContext(interpreter.getContext()));
|
||||
|
||||
if (settings.json)
|
||||
{
|
||||
@ -326,7 +326,7 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
|
||||
auto plan_array = std::make_unique<JSONBuilder::JSONArray>();
|
||||
plan_array->add(std::move(plan_map));
|
||||
|
||||
auto format_settings = getFormatSettings(getContext());
|
||||
auto format_settings = getFormatSettings(interpreter.getContext());
|
||||
format_settings.json.quote_64bit_integers = false;
|
||||
|
||||
JSONBuilder::FormatSettings json_format_settings{.settings = format_settings};
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <utility>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/ObjectUtils.h>
|
||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
@ -187,29 +188,56 @@ ActionsDAGPtr evaluateMissingDefaults(
|
||||
return createExpressions(header, expr_list, save_unneeded_columns, context);
|
||||
}
|
||||
|
||||
static bool arrayHasNoElementsRead(const IColumn & column)
|
||||
static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
|
||||
const NamesAndTypesList & available_columns, const Columns & res_columns)
|
||||
{
|
||||
const auto * column_array = typeid_cast<const ColumnArray *>(&column);
|
||||
std::unordered_map<String, ColumnPtr> offsets_columns;
|
||||
|
||||
if (!column_array)
|
||||
return false;
|
||||
auto available_column = available_columns.begin();
|
||||
for (size_t i = 0; i < available_columns.size(); ++i, ++available_column)
|
||||
{
|
||||
if (res_columns[i] == nullptr || isColumnConst(*res_columns[i]))
|
||||
continue;
|
||||
|
||||
size_t size = column_array->size();
|
||||
if (!size)
|
||||
return false;
|
||||
auto serialization = IDataType::getSerialization(*available_column);
|
||||
serialization->enumerateStreams([&](const auto & subpath)
|
||||
{
|
||||
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
|
||||
return;
|
||||
|
||||
size_t data_size = column_array->getData().size();
|
||||
if (data_size)
|
||||
return false;
|
||||
auto stream_name = ISerialization::getFileNameForStream(*available_column, subpath);
|
||||
const auto & current_offsets_column = subpath.back().data.column;
|
||||
|
||||
size_t last_offset = column_array->getOffsets()[size - 1];
|
||||
return last_offset != 0;
|
||||
/// If for some reason multiple offsets columns are present
|
||||
/// for the same nested data structure, choose the one that is not empty.
|
||||
if (current_offsets_column && !current_offsets_column->empty())
|
||||
{
|
||||
auto & offsets_column = offsets_columns[stream_name];
|
||||
if (!offsets_column)
|
||||
offsets_column = current_offsets_column;
|
||||
|
||||
#ifndef NDEBUG
|
||||
const auto & offsets_data = assert_cast<const ColumnUInt64 &>(*offsets_column).getData();
|
||||
const auto & current_offsets_data = assert_cast<const ColumnUInt64 &>(*current_offsets_column).getData();
|
||||
|
||||
if (offsets_data != current_offsets_data)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Found non-equal columns with offsets (sizes: {} and {}) for stream {}",
|
||||
offsets_data.size(), current_offsets_data.size(), stream_name);
|
||||
#endif
|
||||
}
|
||||
}, available_column->type, res_columns[i]);
|
||||
}
|
||||
|
||||
return offsets_columns;
|
||||
}
|
||||
|
||||
void fillMissingColumns(
|
||||
Columns & res_columns,
|
||||
size_t num_rows,
|
||||
const NamesAndTypesList & requested_columns,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const NameSet & partially_read_columns,
|
||||
StorageMetadataPtr metadata_snapshot)
|
||||
{
|
||||
size_t num_columns = requested_columns.size();
|
||||
@ -218,65 +246,79 @@ void fillMissingColumns(
|
||||
"Invalid number of columns passed to fillMissingColumns. Expected {}, got {}",
|
||||
num_columns, res_columns.size());
|
||||
|
||||
/// For a missing column of a nested data structure we must create not a column of empty
|
||||
/// arrays, but a column of arrays of correct length.
|
||||
/// For a missing column of a nested data structure
|
||||
/// we must create not a column of empty arrays,
|
||||
/// but a column of arrays of correct length.
|
||||
|
||||
/// First, collect offset columns for all arrays in the block.
|
||||
auto offsets_columns = collectOffsetsColumns(available_columns, res_columns);
|
||||
|
||||
std::unordered_map<String, ColumnPtr> offset_columns;
|
||||
/// Insert default values only for columns without default expressions.
|
||||
auto requested_column = requested_columns.begin();
|
||||
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
|
||||
{
|
||||
if (res_columns[i] == nullptr)
|
||||
continue;
|
||||
|
||||
if (const auto * array = typeid_cast<const ColumnArray *>(res_columns[i].get()))
|
||||
{
|
||||
String offsets_name = Nested::extractTableName(requested_column->name);
|
||||
auto & offsets_column = offset_columns[offsets_name];
|
||||
|
||||
/// If for some reason multiple offsets columns are present for the same nested data structure,
|
||||
/// choose the one that is not empty.
|
||||
if (!offsets_column || offsets_column->empty())
|
||||
offsets_column = array->getOffsetsPtr();
|
||||
}
|
||||
}
|
||||
|
||||
/// insert default values only for columns without default expressions
|
||||
requested_column = requested_columns.begin();
|
||||
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
|
||||
{
|
||||
const auto & [name, type] = *requested_column;
|
||||
|
||||
if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i]))
|
||||
if (res_columns[i] && partially_read_columns.contains(name))
|
||||
res_columns[i] = nullptr;
|
||||
|
||||
if (res_columns[i] == nullptr)
|
||||
if (res_columns[i])
|
||||
continue;
|
||||
|
||||
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
|
||||
continue;
|
||||
|
||||
std::vector<ColumnPtr> current_offsets;
|
||||
size_t num_dimensions = 0;
|
||||
|
||||
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
|
||||
if (array_type && !offsets_columns.empty())
|
||||
{
|
||||
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
|
||||
continue;
|
||||
num_dimensions = getNumberOfDimensions(*array_type);
|
||||
current_offsets.resize(num_dimensions);
|
||||
|
||||
String offsets_name = Nested::extractTableName(name);
|
||||
auto offset_it = offset_columns.find(offsets_name);
|
||||
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
|
||||
if (offset_it != offset_columns.end() && array_type)
|
||||
auto serialization = IDataType::getSerialization(*requested_column);
|
||||
serialization->enumerateStreams([&](const auto & subpath)
|
||||
{
|
||||
const auto & nested_type = array_type->getNestedType();
|
||||
ColumnPtr offsets_column = offset_it->second;
|
||||
size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
|
||||
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
|
||||
return;
|
||||
|
||||
ColumnPtr nested_column =
|
||||
nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
|
||||
size_t level = ISerialization::getArrayLevel(subpath);
|
||||
assert(level < num_dimensions);
|
||||
|
||||
res_columns[i] = ColumnArray::create(nested_column, offsets_column);
|
||||
}
|
||||
else
|
||||
auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath);
|
||||
auto it = offsets_columns.find(stream_name);
|
||||
if (it != offsets_columns.end())
|
||||
current_offsets[level] = it->second;
|
||||
});
|
||||
|
||||
for (size_t j = 0; j < num_dimensions; ++j)
|
||||
{
|
||||
/// We must turn a constant column into a full column because the interpreter could infer
|
||||
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
|
||||
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
|
||||
if (!current_offsets[j])
|
||||
{
|
||||
current_offsets.resize(j);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!current_offsets.empty())
|
||||
{
|
||||
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
|
||||
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
|
||||
|
||||
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
|
||||
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
|
||||
|
||||
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
|
||||
res_columns[i] = ColumnArray::create(res_columns[i], *it);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We must turn a constant column into a full column because the interpreter could infer
|
||||
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
|
||||
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Common/COW.h>
|
||||
|
||||
@ -43,6 +44,8 @@ void fillMissingColumns(
|
||||
Columns & res_columns,
|
||||
size_t num_rows,
|
||||
const NamesAndTypesList & requested_columns,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const NameSet & partially_read_columns,
|
||||
StorageMetadataPtr metadata_snapshot);
|
||||
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ inline const auto & getOptimizations()
|
||||
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
|
||||
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
|
||||
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_read_in_window_order}
|
||||
}};
|
||||
|
||||
return optimizations;
|
||||
|
@ -11,6 +11,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
|
||||
settings.optimize_plan = from.query_plan_enable_optimizations;
|
||||
settings.max_optimizations_to_apply = from.query_plan_max_optimizations_to_apply;
|
||||
settings.filter_push_down = from.query_plan_filter_push_down;
|
||||
settings.optimize_read_in_window_order = from.optimize_read_in_window_order;
|
||||
return settings;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,9 @@ struct QueryPlanOptimizationSettings
|
||||
/// If filter push down optimization is enabled.
|
||||
bool filter_push_down = true;
|
||||
|
||||
/// window functions read in order optimization
|
||||
bool optimize_read_in_window_order = true;
|
||||
|
||||
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
|
||||
static QueryPlanOptimizationSettings fromContext(ContextPtr from);
|
||||
};
|
||||
|
@ -61,12 +61,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto context = read_from_merge_tree->getContext();
|
||||
if (!context->getSettings().optimize_read_in_window_order)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
const auto context = read_from_merge_tree->getContext();
|
||||
const auto & query_info = read_from_merge_tree->getQueryInfo();
|
||||
const auto * select_query = query_info.query->as<ASTSelectQuery>();
|
||||
|
||||
|
@ -780,7 +780,7 @@ void ColumnsDescription::addSubcolumns(const String & name_in_storage, const Dat
|
||||
"Cannot add subcolumn {}: column with this name already exists", subcolumn.name);
|
||||
|
||||
subcolumns.get<0>().insert(std::move(subcolumn));
|
||||
}, {type_in_storage->getDefaultSerialization(), type_in_storage, nullptr, nullptr});
|
||||
}, ISerialization::SubstreamData(type_in_storage->getDefaultSerialization()).withType(type_in_storage));
|
||||
}
|
||||
|
||||
void ColumnsDescription::removeSubcolumns(const String & name_in_storage)
|
||||
|
@ -650,23 +650,31 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor
|
||||
}
|
||||
|
||||
void DataPartStorageOnDisk::backup(
|
||||
TemporaryFilesOnDisks & temp_dirs,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
const NameSet & files_without_checksums,
|
||||
const String & path_in_backup,
|
||||
BackupEntries & backup_entries) const
|
||||
BackupEntries & backup_entries,
|
||||
bool make_temporary_hard_links,
|
||||
TemporaryFilesOnDisks * temp_dirs) const
|
||||
{
|
||||
fs::path part_path_on_disk = fs::path{root_path} / part_dir;
|
||||
fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir;
|
||||
|
||||
auto disk = volume->getDisk();
|
||||
auto temp_dir_it = temp_dirs.find(disk);
|
||||
if (temp_dir_it == temp_dirs.end())
|
||||
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
|
||||
auto temp_dir_owner = temp_dir_it->second;
|
||||
fs::path temp_dir = temp_dir_owner->getPath();
|
||||
fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path();
|
||||
disk->createDirectories(temp_part_dir);
|
||||
|
||||
fs::path temp_part_dir;
|
||||
std::shared_ptr<TemporaryFileOnDisk> temp_dir_owner;
|
||||
if (make_temporary_hard_links)
|
||||
{
|
||||
assert(temp_dirs);
|
||||
auto temp_dir_it = temp_dirs->find(disk);
|
||||
if (temp_dir_it == temp_dirs->end())
|
||||
temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
|
||||
temp_dir_owner = temp_dir_it->second;
|
||||
fs::path temp_dir = temp_dir_owner->getPath();
|
||||
temp_part_dir = temp_dir / part_path_in_backup.relative_path();
|
||||
disk->createDirectories(temp_part_dir);
|
||||
}
|
||||
|
||||
/// For example,
|
||||
/// part_path_in_backup = /data/test/table/0_1_1_0
|
||||
@ -683,13 +691,18 @@ void DataPartStorageOnDisk::backup(
|
||||
continue; /// Skip *.proj files - they're actually directories and will be handled.
|
||||
String filepath_on_disk = part_path_on_disk / filepath;
|
||||
String filepath_in_backup = part_path_in_backup / filepath;
|
||||
String hardlink_filepath = temp_part_dir / filepath;
|
||||
|
||||
disk->createHardLink(filepath_on_disk, hardlink_filepath);
|
||||
if (make_temporary_hard_links)
|
||||
{
|
||||
String hardlink_filepath = temp_part_dir / filepath;
|
||||
disk->createHardLink(filepath_on_disk, hardlink_filepath);
|
||||
filepath_on_disk = hardlink_filepath;
|
||||
}
|
||||
|
||||
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
|
||||
backup_entries.emplace_back(
|
||||
filepath_in_backup,
|
||||
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner));
|
||||
std::make_unique<BackupEntryFromImmutableFile>(disk, filepath_on_disk, checksum.file_size, file_hash, temp_dir_owner));
|
||||
}
|
||||
|
||||
for (const auto & filepath : files_without_checksums)
|
||||
|
@ -89,11 +89,12 @@ public:
|
||||
bool shallParticipateInMerges(const IStoragePolicy &) const override;
|
||||
|
||||
void backup(
|
||||
TemporaryFilesOnDisks & temp_dirs,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
const NameSet & files_without_checksums,
|
||||
const String & path_in_backup,
|
||||
BackupEntries & backup_entries) const override;
|
||||
BackupEntries & backup_entries,
|
||||
bool make_temporary_hard_links,
|
||||
TemporaryFilesOnDisks * temp_dirs) const override;
|
||||
|
||||
DataPartStoragePtr freeze(
|
||||
const std::string & to,
|
||||
|
@ -422,8 +422,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
|
||||
const auto data_settings = data.getSettings();
|
||||
|
||||
if (data_settings->allow_remote_fs_zero_copy_replication && !try_zero_copy)
|
||||
LOG_WARNING(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name);
|
||||
if (data.canUseZeroCopyReplication() && !try_zero_copy)
|
||||
LOG_INFO(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name);
|
||||
|
||||
/// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/,
|
||||
/// but detached part name prefix should not contain underscore.
|
||||
@ -479,8 +479,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
}
|
||||
else
|
||||
{
|
||||
if (data_settings->allow_remote_fs_zero_copy_replication)
|
||||
LOG_WARNING(log, "Cannot select any zero-copy disk for {}", part_name);
|
||||
if (data.canUseZeroCopyReplication())
|
||||
LOG_INFO(log, "Cannot select any zero-copy disk for {}", part_name);
|
||||
|
||||
try_zero_copy = false;
|
||||
}
|
||||
|
@ -177,11 +177,12 @@ public:
|
||||
/// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time).
|
||||
using TemporaryFilesOnDisks = std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>>;
|
||||
virtual void backup(
|
||||
TemporaryFilesOnDisks & temp_dirs,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
const NameSet & files_without_checksums,
|
||||
const String & path_in_backup,
|
||||
BackupEntries & backup_entries) const = 0;
|
||||
BackupEntries & backup_entries,
|
||||
bool make_temporary_hard_links,
|
||||
TemporaryFilesOnDisks * temp_dirs) const = 0;
|
||||
|
||||
/// Creates hardlinks into 'to/dir_path' for every file in data part.
|
||||
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.
|
||||
|
@ -445,11 +445,11 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
|
||||
column_name_to_position.clear();
|
||||
column_name_to_position.reserve(new_columns.size());
|
||||
size_t pos = 0;
|
||||
for (const auto & column : columns)
|
||||
column_name_to_position.emplace(column.name, pos++);
|
||||
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
column_name_to_position.emplace(column.name, pos++);
|
||||
|
||||
auto it = serialization_infos.find(column.name);
|
||||
auto serialization = it == serialization_infos.end()
|
||||
? IDataType::getSerialization(column)
|
||||
@ -461,7 +461,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
|
||||
{
|
||||
auto full_name = Nested::concatenateName(column.name, subname);
|
||||
serializations.emplace(full_name, subdata.serialization);
|
||||
}, {serialization, nullptr, nullptr, nullptr});
|
||||
}, ISerialization::SubstreamData(serialization));
|
||||
}
|
||||
|
||||
columns_description = ColumnsDescription(columns);
|
||||
@ -1352,7 +1352,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
|
||||
{
|
||||
files.push_back("columns.txt");
|
||||
|
@ -63,7 +63,13 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
|
||||
{
|
||||
try
|
||||
{
|
||||
DB::fillMissingColumns(res_columns, num_rows, requested_columns, metadata_snapshot);
|
||||
NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end());
|
||||
DB::fillMissingColumns(
|
||||
res_columns, num_rows,
|
||||
Nested::convertToSubcolumns(requested_columns),
|
||||
Nested::convertToSubcolumns(available_columns),
|
||||
partially_read_columns, metadata_snapshot);
|
||||
|
||||
should_evaluate_missing_defaults = std::any_of(
|
||||
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
|
||||
}
|
||||
@ -201,20 +207,56 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
|
||||
}
|
||||
}
|
||||
|
||||
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
|
||||
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
|
||||
{
|
||||
String table_name = Nested::extractTableName(column_name);
|
||||
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
|
||||
{
|
||||
Names offsets_streams;
|
||||
serialization->enumerateStreams([&](const auto & subpath)
|
||||
{
|
||||
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
|
||||
return;
|
||||
|
||||
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
|
||||
auto full_name = Nested::concatenateName(name_in_storage, subname);
|
||||
offsets_streams.push_back(full_name);
|
||||
});
|
||||
|
||||
return offsets_streams;
|
||||
};
|
||||
|
||||
auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage());
|
||||
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
|
||||
|
||||
size_t max_matched_streams = 0;
|
||||
ColumnPosition position;
|
||||
|
||||
/// Find column that has maximal number of matching
|
||||
/// offsets columns with required_column.
|
||||
for (const auto & part_column : data_part_info_for_read->getColumns())
|
||||
{
|
||||
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
|
||||
auto name_in_storage = Nested::extractTableName(part_column.name);
|
||||
if (name_in_storage != required_name_in_storage)
|
||||
continue;
|
||||
|
||||
auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage);
|
||||
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
|
||||
|
||||
size_t i = 0;
|
||||
for (; i < required_offsets_streams.size(); ++i)
|
||||
{
|
||||
auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage());
|
||||
if (position && Nested::extractTableName(part_column.name) == table_name)
|
||||
return position;
|
||||
if (!offsets_streams_set.contains(required_offsets_streams[i]))
|
||||
break;
|
||||
}
|
||||
|
||||
if (i && (!position || i > max_matched_streams))
|
||||
{
|
||||
max_matched_streams = i;
|
||||
position = data_part_info_for_read->getColumnPosition(part_column.name);
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
return position;
|
||||
}
|
||||
|
||||
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
|
||||
|
@ -92,7 +92,9 @@ protected:
|
||||
MarkRanges all_mark_ranges;
|
||||
|
||||
using ColumnPosition = std::optional<size_t>;
|
||||
ColumnPosition findColumnForOffsets(const String & column_name) const;
|
||||
ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
|
||||
|
||||
NameSet partially_read_columns;
|
||||
|
||||
private:
|
||||
/// Alter conversions, which must be applied on fly if required
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Backups/BackupEntryFromImmutableFile.h>
|
||||
#include <Backups/BackupEntryFromSmallFile.h>
|
||||
#include <Backups/BackupEntryWrappedWith.h>
|
||||
#include <Backups/IBackup.h>
|
||||
#include <Backups/RestorerFromBackup.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
@ -4109,29 +4110,74 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector
|
||||
else
|
||||
data_parts = getVisibleDataPartsVector(local_context);
|
||||
|
||||
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
|
||||
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
|
||||
}
|
||||
|
||||
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup)
|
||||
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context)
|
||||
{
|
||||
BackupEntries backup_entries;
|
||||
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
|
||||
TableLockHolder table_lock;
|
||||
|
||||
for (const auto & part : data_parts)
|
||||
{
|
||||
/// Hard links is the default way to ensure that we'll be keeping access to the files of parts.
|
||||
bool make_temporary_hard_links = true;
|
||||
bool hold_storage_and_part_ptrs = false;
|
||||
bool hold_table_lock = false;
|
||||
|
||||
if (getStorageID().hasUUID())
|
||||
{
|
||||
/// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup,
|
||||
/// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting
|
||||
/// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog).
|
||||
make_temporary_hard_links = false;
|
||||
hold_storage_and_part_ptrs = true;
|
||||
}
|
||||
else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
/// Hard links don't work correctly with zero copy replication.
|
||||
make_temporary_hard_links = false;
|
||||
hold_storage_and_part_ptrs = true;
|
||||
hold_table_lock = true;
|
||||
}
|
||||
|
||||
if (hold_table_lock && !table_lock)
|
||||
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
|
||||
|
||||
BackupEntries backup_entries_from_part;
|
||||
part->data_part_storage->backup(
|
||||
temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, backup_entries);
|
||||
part->checksums,
|
||||
part->getFileNamesWithoutChecksums(),
|
||||
data_path_in_backup,
|
||||
backup_entries_from_part,
|
||||
make_temporary_hard_links,
|
||||
&temp_dirs);
|
||||
|
||||
auto projection_parts = part->getProjectionParts();
|
||||
for (const auto & [projection_name, projection_part] : projection_parts)
|
||||
{
|
||||
projection_part->data_part_storage->backup(
|
||||
temp_dirs,
|
||||
projection_part->checksums,
|
||||
projection_part->getFileNamesWithoutChecksums(),
|
||||
fs::path{data_path_in_backup} / part->name,
|
||||
backup_entries);
|
||||
backup_entries_from_part,
|
||||
make_temporary_hard_links,
|
||||
&temp_dirs);
|
||||
}
|
||||
|
||||
if (hold_storage_and_part_ptrs)
|
||||
{
|
||||
/// Wrap backup entries with smart pointers to data parts and to the storage itself
|
||||
/// (we'll be holding those smart pointers for as long as we'll be using the backup entries).
|
||||
auto storage_and_part = std::make_pair(shared_from_this(), part);
|
||||
if (hold_table_lock)
|
||||
wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock));
|
||||
else
|
||||
wrapBackupEntriesWith(backup_entries_from_part, storage_and_part);
|
||||
}
|
||||
|
||||
insertAtEnd(backup_entries, std::move(backup_entries_from_part));
|
||||
}
|
||||
|
||||
return backup_entries;
|
||||
|
@ -1231,7 +1231,7 @@ protected:
|
||||
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
|
||||
|
||||
/// Makes backup entries to backup the parts of this table.
|
||||
static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup);
|
||||
BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context);
|
||||
|
||||
class RestoredPartsHolder;
|
||||
|
||||
|
@ -66,8 +66,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
|
||||
compressed_streams.emplace(stream_name, stream);
|
||||
};
|
||||
|
||||
ISerialization::SubstreamPath path;
|
||||
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type);
|
||||
data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
|
||||
}
|
||||
|
||||
namespace
|
||||
|
@ -121,7 +121,7 @@ void MergeTreeDataPartWriterWide::addStreams(
|
||||
};
|
||||
|
||||
ISerialization::SubstreamPath path;
|
||||
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type);
|
||||
data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
|
||||
}
|
||||
|
||||
|
||||
@ -255,10 +255,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
|
||||
void MergeTreeDataPartWriterWide::writeSingleMark(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
size_t number_of_rows,
|
||||
ISerialization::SubstreamPath & path)
|
||||
size_t number_of_rows)
|
||||
{
|
||||
StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns, path);
|
||||
StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns);
|
||||
for (const auto & mark : marks)
|
||||
flushMarkToFile(mark, number_of_rows);
|
||||
}
|
||||
@ -274,8 +273,7 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
|
||||
|
||||
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
ISerialization::SubstreamPath & path)
|
||||
WrittenOffsetColumns & offset_columns)
|
||||
{
|
||||
StreamsWithMarks result;
|
||||
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
|
||||
@ -300,7 +298,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
|
||||
|
||||
result.push_back(stream_with_mark);
|
||||
}, path);
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -328,7 +326,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
|
||||
return;
|
||||
|
||||
column_streams[stream_name]->compressed.nextIfAtEnd();
|
||||
}, serialize_settings.path);
|
||||
});
|
||||
}
|
||||
|
||||
/// Column must not be empty. (column.size() !== 0)
|
||||
@ -366,7 +364,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
{
|
||||
if (last_non_written_marks.contains(name))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
|
||||
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns, serialize_settings.path);
|
||||
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns);
|
||||
}
|
||||
|
||||
writeSingleGranule(
|
||||
@ -390,7 +388,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
}
|
||||
}
|
||||
|
||||
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
|
||||
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
||||
{
|
||||
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
|
||||
if (is_offsets)
|
||||
@ -398,7 +396,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
||||
offset_columns.insert(stream_name);
|
||||
}
|
||||
}, serialize_settings.path);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -553,7 +551,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
|
||||
}
|
||||
|
||||
if (write_final_mark)
|
||||
writeFinalMark(*it, offset_columns, serialize_settings.path);
|
||||
writeFinalMark(*it, offset_columns);
|
||||
}
|
||||
}
|
||||
|
||||
@ -618,10 +616,9 @@ void MergeTreeDataPartWriterWide::finish(bool sync)
|
||||
|
||||
void MergeTreeDataPartWriterWide::writeFinalMark(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
ISerialization::SubstreamPath & path)
|
||||
WrittenOffsetColumns & offset_columns)
|
||||
{
|
||||
writeSingleMark(column, offset_columns, 0, path);
|
||||
writeSingleMark(column, offset_columns, 0);
|
||||
/// Memoize information about offsets
|
||||
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
|
||||
{
|
||||
@ -631,7 +628,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
|
||||
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
|
||||
offset_columns.insert(stream_name);
|
||||
}
|
||||
}, path);
|
||||
});
|
||||
}
|
||||
|
||||
static void fillIndexGranularityImpl(
|
||||
|
@ -61,8 +61,7 @@ private:
|
||||
/// Take offsets from column and return as MarkInCompressed file with stream name
|
||||
StreamsWithMarks getCurrentMarksForColumn(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
ISerialization::SubstreamPath & path);
|
||||
WrittenOffsetColumns & offset_columns);
|
||||
|
||||
/// Write mark to disk using stream and rows count
|
||||
void flushMarkToFile(
|
||||
@ -73,13 +72,11 @@ private:
|
||||
void writeSingleMark(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
size_t number_of_rows,
|
||||
ISerialization::SubstreamPath & path);
|
||||
size_t number_of_rows);
|
||||
|
||||
void writeFinalMark(
|
||||
const NameAndTypePair & column,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
ISerialization::SubstreamPath & path);
|
||||
WrittenOffsetColumns & offset_columns);
|
||||
|
||||
void addStreams(
|
||||
const NameAndTypePair & column,
|
||||
|
@ -46,35 +46,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
{
|
||||
try
|
||||
{
|
||||
size_t columns_num = columns_to_read.size();
|
||||
|
||||
column_positions.resize(columns_num);
|
||||
read_only_offsets.resize(columns_num);
|
||||
|
||||
for (size_t i = 0; i < columns_num; ++i)
|
||||
{
|
||||
const auto & column_to_read = columns_to_read[i];
|
||||
|
||||
if (column_to_read.isSubcolumn())
|
||||
{
|
||||
auto storage_column_from_part = getColumnInPart(
|
||||
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
|
||||
|
||||
if (!storage_column_from_part.type->tryGetSubcolumnType(column_to_read.getSubcolumnName()))
|
||||
continue;
|
||||
}
|
||||
|
||||
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
|
||||
if (!position && typeid_cast<const DataTypeArray *>(column_to_read.type.get()))
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
position = findColumnForOffsets(column_to_read.name);
|
||||
read_only_offsets[i] = (position != std::nullopt);
|
||||
}
|
||||
|
||||
column_positions[i] = std::move(position);
|
||||
}
|
||||
fillColumnPositions();
|
||||
|
||||
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
|
||||
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
|
||||
@ -137,6 +109,44 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeReaderCompact::fillColumnPositions()
|
||||
{
|
||||
size_t columns_num = columns_to_read.size();
|
||||
|
||||
column_positions.resize(columns_num);
|
||||
read_only_offsets.resize(columns_num);
|
||||
|
||||
for (size_t i = 0; i < columns_num; ++i)
|
||||
{
|
||||
const auto & column_to_read = columns_to_read[i];
|
||||
|
||||
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
|
||||
bool is_array = isArray(column_to_read.type);
|
||||
|
||||
if (column_to_read.isSubcolumn())
|
||||
{
|
||||
auto storage_column_from_part = getColumnInPart(
|
||||
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
|
||||
|
||||
auto subcolumn_name = column_to_read.getSubcolumnName();
|
||||
if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name))
|
||||
position.reset();
|
||||
}
|
||||
|
||||
if (!position && is_array)
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
position = findColumnForOffsets(column_to_read);
|
||||
read_only_offsets[i] = (position != std::nullopt);
|
||||
}
|
||||
|
||||
column_positions[i] = std::move(position);
|
||||
if (read_only_offsets[i])
|
||||
partially_read_columns.insert(column_to_read.name);
|
||||
}
|
||||
}
|
||||
|
||||
size_t MergeTreeReaderCompact::readRows(
|
||||
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
|
||||
{
|
||||
@ -214,7 +224,8 @@ void MergeTreeReaderCompact::readData(
|
||||
|
||||
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
|
||||
{
|
||||
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes))
|
||||
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
|
||||
if (only_offsets && !is_offsets)
|
||||
return nullptr;
|
||||
|
||||
return data_buffer;
|
||||
|
@ -39,6 +39,7 @@ public:
|
||||
|
||||
private:
|
||||
bool isContinuousReading(size_t mark, size_t column_position);
|
||||
void fillColumnPositions();
|
||||
|
||||
ReadBuffer * data_buffer;
|
||||
CompressedReadBufferBase * compressed_data_buffer;
|
||||
|
@ -33,13 +33,19 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
|
||||
{})
|
||||
, part_in_memory(std::move(data_part_))
|
||||
{
|
||||
for (const auto & [name, type] : columns_to_read)
|
||||
for (const auto & column_to_read : columns_to_read)
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
if (!part_in_memory->block.has(name) && typeid_cast<const DataTypeArray *>(type.get()))
|
||||
if (auto offset_position = findColumnForOffsets(name))
|
||||
positions_for_offsets[name] = *offset_position;
|
||||
if (typeid_cast<const DataTypeArray *>(column_to_read.type.get())
|
||||
&& !tryGetColumnFromBlock(part_in_memory->block, column_to_read))
|
||||
{
|
||||
if (auto offsets_position = findColumnForOffsets(column_to_read))
|
||||
{
|
||||
positions_for_offsets[column_to_read.name] = *offsets_position;
|
||||
partially_read_columns.insert(column_to_read.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ namespace DB
|
||||
|
||||
namespace
|
||||
{
|
||||
using OffsetColumns = std::map<std::string, ColumnPtr>;
|
||||
constexpr auto DATA_FILE_EXTENSION = ".bin";
|
||||
}
|
||||
|
||||
@ -160,12 +159,18 @@ void MergeTreeReaderWide::addStreams(
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
|
||||
clockid_t clock_type)
|
||||
{
|
||||
bool has_any_stream = false;
|
||||
bool has_all_streams = true;
|
||||
|
||||
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
|
||||
{
|
||||
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
||||
|
||||
if (streams.contains(stream_name))
|
||||
{
|
||||
has_any_stream = true;
|
||||
return;
|
||||
}
|
||||
|
||||
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
|
||||
|
||||
@ -173,8 +178,12 @@ void MergeTreeReaderWide::addStreams(
|
||||
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
|
||||
*/
|
||||
if (!data_file_exists)
|
||||
{
|
||||
has_all_streams = false;
|
||||
return;
|
||||
}
|
||||
|
||||
has_any_stream = true;
|
||||
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
|
||||
|
||||
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
|
||||
@ -186,6 +195,9 @@ void MergeTreeReaderWide::addStreams(
|
||||
};
|
||||
|
||||
serialization->enumerateStreams(callback);
|
||||
|
||||
if (has_any_stream && !has_all_streams)
|
||||
partially_read_columns.insert(name_and_type.name);
|
||||
}
|
||||
|
||||
|
||||
@ -283,6 +295,7 @@ void MergeTreeReaderWide::readData(
|
||||
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
|
||||
seek_to_mark, current_task_last_mark, cache);
|
||||
};
|
||||
|
||||
deserialize_settings.continuous_reading = continue_reading;
|
||||
auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name];
|
||||
|
||||
|
@ -1302,7 +1302,7 @@ private:
|
||||
*ctx->source_part->data_part_storage, it->name(), destination);
|
||||
hardlinked_files.insert(it->name());
|
||||
}
|
||||
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir
|
||||
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
|
||||
{
|
||||
// it's a projection part directory
|
||||
ctx->data_part_storage_builder->createProjection(destination);
|
||||
|
@ -95,7 +95,7 @@ protected:
|
||||
++name_and_type;
|
||||
}
|
||||
|
||||
fillMissingColumns(columns, src.rows(), column_names_and_types, /*metadata_snapshot=*/ nullptr);
|
||||
fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
|
||||
assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
|
||||
|
||||
return Chunk(std::move(columns), src.rows());
|
||||
|
@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
|
||||
for (const auto & data_part : data_parts)
|
||||
min_data_version = std::min(min_data_version, data_part->info.getDataVersion());
|
||||
|
||||
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
|
||||
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
|
||||
backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup));
|
||||
}
|
||||
|
||||
|
@ -7336,6 +7336,21 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::canUseZeroCopyReplication() const
|
||||
{
|
||||
auto settings_ptr = getSettings();
|
||||
if (!settings_ptr->allow_remote_fs_zero_copy_replication)
|
||||
return false;
|
||||
|
||||
auto disks = getStoragePolicy()->getDisks();
|
||||
for (const auto & disk : disks)
|
||||
{
|
||||
if (disk->supportZeroCopyReplication())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::checkBrokenDisks()
|
||||
{
|
||||
auto disks = getStoragePolicy()->getDisks();
|
||||
@ -8290,7 +8305,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
else
|
||||
data_parts = getVisibleDataPartsVector(local_context);
|
||||
|
||||
auto backup_entries = backupParts(data_parts, "");
|
||||
auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", local_context);
|
||||
|
||||
auto coordination = backup_entries_collector.getBackupCoordination();
|
||||
String shared_id = getTableSharedID();
|
||||
|
@ -327,6 +327,7 @@ public:
|
||||
static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
|
||||
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
|
||||
|
||||
bool canUseZeroCopyReplication() const;
|
||||
private:
|
||||
std::atomic_bool are_restoring_replica {false};
|
||||
|
||||
|
@ -242,7 +242,7 @@ void StorageSystemPartsColumns::processNextStorage(
|
||||
IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data)
|
||||
{
|
||||
/// We count only final subcolumns, which are represented by files on disk
|
||||
/// and skip intermediate suibcolumns of types Tuple and Nested.
|
||||
/// and skip intermediate subcolumns of types Tuple and Nested.
|
||||
if (isTuple(data.type) || isNested(data.type))
|
||||
return;
|
||||
|
||||
@ -270,7 +270,7 @@ void StorageSystemPartsColumns::processNextStorage(
|
||||
subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed);
|
||||
subcolumn_marks_bytes.push_back(size.marks);
|
||||
|
||||
}, { serialization, column.type, nullptr, nullptr });
|
||||
}, ISerialization::SubstreamData(serialization).withType(column.type));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
columns[res_index++]->insert(subcolumn_names);
|
||||
|
@ -17,7 +17,7 @@ from env_helper import (
|
||||
from s3_helper import S3Helper
|
||||
from get_robot_token import get_best_robot_token
|
||||
from pr_info import PRInfo
|
||||
from build_download_helper import get_build_name_for_check, get_build_urls
|
||||
from build_download_helper import get_build_name_for_check, read_build_urls
|
||||
from docker_pull_helper import get_image_with_version
|
||||
from commit_status_helper import post_commit_status
|
||||
from clickhouse_helper import ClickHouseHelper, prepare_tests_results_for_clickhouse
|
||||
@ -29,7 +29,11 @@ IMAGE_NAME = "clickhouse/fuzzer"
|
||||
|
||||
def get_run_command(pr_number, sha, download_url, workspace_path, image):
|
||||
return (
|
||||
f"docker run --network=host --volume={workspace_path}:/workspace "
|
||||
f"docker run "
|
||||
# For sysctl
|
||||
"--privileged "
|
||||
"--network=host "
|
||||
f"--volume={workspace_path}:/workspace "
|
||||
"--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE "
|
||||
f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" '
|
||||
f"{image}"
|
||||
@ -69,7 +73,7 @@ if __name__ == "__main__":
|
||||
|
||||
build_name = get_build_name_for_check(check_name)
|
||||
print(build_name)
|
||||
urls = get_build_urls(build_name, reports_path)
|
||||
urls = read_build_urls(build_name, reports_path)
|
||||
if not urls:
|
||||
raise Exception("No build URLs found")
|
||||
|
||||
|
@ -1,11 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
import requests # type: ignore
|
||||
|
||||
@ -41,11 +41,11 @@ def get_with_retries(
|
||||
return response
|
||||
|
||||
|
||||
def get_build_name_for_check(check_name):
|
||||
def get_build_name_for_check(check_name) -> str:
|
||||
return CI_CONFIG["tests_config"][check_name]["required_build"]
|
||||
|
||||
|
||||
def get_build_urls(build_name, reports_path):
|
||||
def read_build_urls(build_name, reports_path) -> List[str]:
|
||||
for root, _, files in os.walk(reports_path):
|
||||
for f in files:
|
||||
if build_name in f:
|
||||
@ -56,7 +56,7 @@ def get_build_urls(build_name, reports_path):
|
||||
return []
|
||||
|
||||
|
||||
def dowload_build_with_progress(url, path):
|
||||
def download_build_with_progress(url, path):
|
||||
logging.info("Downloading from %s to temp path %s", url, path)
|
||||
for i in range(DOWNLOAD_RETRIES_COUNT):
|
||||
try:
|
||||
@ -104,14 +104,14 @@ def download_builds(result_path, build_urls, filter_fn):
|
||||
if filter_fn(url):
|
||||
fname = os.path.basename(url.replace("%2B", "+").replace("%20", " "))
|
||||
logging.info("Will download %s to %s", fname, result_path)
|
||||
dowload_build_with_progress(url, os.path.join(result_path, fname))
|
||||
download_build_with_progress(url, os.path.join(result_path, fname))
|
||||
|
||||
|
||||
def download_builds_filter(
|
||||
check_name, reports_path, result_path, filter_fn=lambda _: True
|
||||
):
|
||||
build_name = get_build_name_for_check(check_name)
|
||||
urls = get_build_urls(build_name, reports_path)
|
||||
urls = read_build_urls(build_name, reports_path)
|
||||
print(urls)
|
||||
|
||||
if not urls:
|
||||
|
79
tests/ci/download_binary.py
Executable file
79
tests/ci/download_binary.py
Executable file
@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
This file is needed to avoid cicle import build_download_helper.py <=> env_helper.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
from build_download_helper import download_build_with_progress
|
||||
from ci_config import CI_CONFIG, BuildConfig
|
||||
from env_helper import RUNNER_TEMP, S3_ARTIFACT_DOWNLOAD_TEMPLATE
|
||||
from git_helper import Git, commit
|
||||
from version_helper import get_version_from_repo, version_arg
|
||||
|
||||
TEMP_PATH = os.path.join(RUNNER_TEMP, "download_binary")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
description="Script to download binary artifacts from S3. Downloaded artifacts "
|
||||
"are renamed to clickhouse-{static_binary_name}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--version",
|
||||
type=version_arg,
|
||||
default=get_version_from_repo().string,
|
||||
help="a version to generate a download url, get from the repo by default",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--commit",
|
||||
type=commit,
|
||||
default=Git(True).sha,
|
||||
help="a version to generate a download url, get from the repo by default",
|
||||
)
|
||||
parser.add_argument("--rename", default=True, help=argparse.SUPPRESS)
|
||||
parser.add_argument(
|
||||
"--no-rename",
|
||||
dest="rename",
|
||||
action="store_false",
|
||||
default=argparse.SUPPRESS,
|
||||
help="if set, the downloaded binary won't be renamed to "
|
||||
"clickhouse-{static_binary_name}, makes sense only for a single build name",
|
||||
)
|
||||
parser.add_argument(
|
||||
"build_names",
|
||||
nargs="+",
|
||||
help="the build names to download",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
if not args.rename and len(args.build_names) > 1:
|
||||
parser.error("`--no-rename` shouldn't be used with more than one build name")
|
||||
return args
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
||||
args = parse_args()
|
||||
os.makedirs(TEMP_PATH, exist_ok=True)
|
||||
for build in args.build_names:
|
||||
# check if it's in CI_CONFIG
|
||||
config = CI_CONFIG["build_config"][build] # type: BuildConfig
|
||||
if args.rename:
|
||||
path = os.path.join(TEMP_PATH, f"clickhouse-{config['static_binary_name']}")
|
||||
else:
|
||||
path = os.path.join(TEMP_PATH, "clickhouse")
|
||||
|
||||
url = S3_ARTIFACT_DOWNLOAD_TEMPLATE.format(
|
||||
pr_or_release=f"{args.version.major}.{args.version.minor}",
|
||||
commit=args.commit,
|
||||
build_name=build,
|
||||
artifact="clickhouse",
|
||||
)
|
||||
download_build_with_progress(url, path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -22,10 +22,14 @@ IMAGES_PATH = os.getenv("IMAGES_PATH", TEMP_PATH)
|
||||
REPORTS_PATH = os.getenv("REPORTS_PATH", p.abspath(p.join(module_dir, "./reports")))
|
||||
REPO_COPY = os.getenv("REPO_COPY", git_root)
|
||||
RUNNER_TEMP = os.getenv("RUNNER_TEMP", p.abspath(p.join(module_dir, "./tmp")))
|
||||
S3_URL = os.getenv("S3_URL", "https://s3.amazonaws.com")
|
||||
S3_DOWNLOAD = os.getenv("S3_DOWNLOAD", S3_URL)
|
||||
S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds")
|
||||
S3_TEST_REPORTS_BUCKET = os.getenv("S3_TEST_REPORTS_BUCKET", "clickhouse-test-reports")
|
||||
S3_URL = os.getenv("S3_URL", "https://s3.amazonaws.com")
|
||||
S3_DOWNLOAD = os.getenv("S3_DOWNLOAD", S3_URL)
|
||||
S3_ARTIFACT_DOWNLOAD_TEMPLATE = (
|
||||
f"{S3_DOWNLOAD}/{S3_BUILDS_BUCKET}/"
|
||||
"{pr_or_release}/{commit}/{build_name}/{artifact}"
|
||||
)
|
||||
|
||||
# These parameters are set only on demand, and only once
|
||||
_GITHUB_JOB_ID = ""
|
||||
|
@ -8,8 +8,8 @@ from collections import namedtuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from artifactory import ArtifactorySaaSPath # type: ignore
|
||||
from build_download_helper import dowload_build_with_progress
|
||||
from env_helper import RUNNER_TEMP, S3_BUILDS_BUCKET, S3_DOWNLOAD
|
||||
from build_download_helper import download_build_with_progress
|
||||
from env_helper import S3_ARTIFACT_DOWNLOAD_TEMPLATE, RUNNER_TEMP
|
||||
from git_helper import TAG_REGEXP, commit, removeprefix, removesuffix
|
||||
|
||||
|
||||
@ -97,18 +97,6 @@ class Packages:
|
||||
|
||||
|
||||
class S3:
|
||||
template = (
|
||||
f"{S3_DOWNLOAD}/"
|
||||
# "clickhouse-builds/"
|
||||
f"{S3_BUILDS_BUCKET}/"
|
||||
# "33333/" or "21.11/" from --release, if pull request is omitted
|
||||
"{pr}/"
|
||||
# "2bef313f75e4cacc6ea2ef2133e8849ecf0385ec/"
|
||||
"{commit}/"
|
||||
# "package_release/clickhouse-common-static_21.11.5.0_amd64.deb"
|
||||
"{s3_path_suffix}"
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pr: int,
|
||||
@ -117,7 +105,7 @@ class S3:
|
||||
force_download: bool,
|
||||
):
|
||||
self._common = dict(
|
||||
pr=pr,
|
||||
pr_or_release=pr,
|
||||
commit=commit,
|
||||
)
|
||||
self.force_download = force_download
|
||||
@ -133,18 +121,19 @@ class S3:
|
||||
self.packages.replace_with_fallback(package_file)
|
||||
|
||||
return
|
||||
url = self.template.format_map(
|
||||
{**self._common, "s3_path_suffix": s3_path_suffix}
|
||||
build_name, artifact = s3_path_suffix.split("/")
|
||||
url = S3_ARTIFACT_DOWNLOAD_TEMPLATE.format_map(
|
||||
{**self._common, "build_name": build_name, "artifact": artifact}
|
||||
)
|
||||
try:
|
||||
dowload_build_with_progress(url, path)
|
||||
download_build_with_progress(url, path)
|
||||
except Exception as e:
|
||||
if "Cannot download dataset from" in e.args[0]:
|
||||
new_url = Packages.fallback_to_all(url)
|
||||
logging.warning(
|
||||
"Fallback downloading %s for old release", fallback_path
|
||||
)
|
||||
dowload_build_with_progress(new_url, fallback_path)
|
||||
download_build_with_progress(new_url, fallback_path)
|
||||
self.packages.replace_with_fallback(package_file)
|
||||
|
||||
def download_deb(self):
|
||||
|
@ -33,7 +33,7 @@ def get_run_command(
|
||||
"docker run --cap-add=SYS_PTRACE "
|
||||
# a static link, don't use S3_URL or S3_DOWNLOAD
|
||||
"-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' "
|
||||
# For dmesg
|
||||
# For dmesg and sysctl
|
||||
"--privileged "
|
||||
f"--volume={build_path}:/package_folder "
|
||||
f"--volume={result_folder}:/test_output "
|
||||
|
@ -20,7 +20,7 @@ const char * auto_contributors[] {{
|
||||
|
||||
VERSIONS = Dict[str, Union[int, str]]
|
||||
|
||||
VERSIONS_TEMPLATE = """# This variables autochanged by release_lib.sh:
|
||||
VERSIONS_TEMPLATE = """# This variables autochanged by tests/ci/version_helper.py:
|
||||
|
||||
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
|
||||
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
|
||||
|
@ -29,7 +29,6 @@ def generate_cluster_def():
|
||||
|
||||
|
||||
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
|
||||
|
||||
user_configs = ["configs/allow_database_types.xml"]
|
||||
|
||||
nodes = []
|
||||
@ -175,11 +174,21 @@ def test_concurrent_backups_on_different_nodes():
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"db_engine, table_engine",
|
||||
[("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")],
|
||||
[
|
||||
("Ordinary", "MergeTree"),
|
||||
("Atomic", "MergeTree"),
|
||||
("Replicated", "ReplicatedMergeTree"),
|
||||
("Memory", "MergeTree"),
|
||||
("Lazy", "Log"),
|
||||
],
|
||||
)
|
||||
def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
||||
if db_engine == "Replicated":
|
||||
db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')"
|
||||
|
||||
if db_engine == "Lazy":
|
||||
db_engine = "Lazy(20)"
|
||||
|
||||
if table_engine.endswith("MergeTree"):
|
||||
table_engine += " ORDER BY tuple()"
|
||||
|
||||
@ -189,7 +198,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
||||
start_time = time.time()
|
||||
end_time = start_time + 60
|
||||
|
||||
def create_table():
|
||||
def create_tables():
|
||||
while time.time() < end_time:
|
||||
node = nodes[randint(0, num_nodes - 1)]
|
||||
table_name = f"mydb.tbl{randint(1, num_nodes)}"
|
||||
@ -200,13 +209,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
||||
f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)"
|
||||
)
|
||||
|
||||
def drop_table():
|
||||
def drop_tables():
|
||||
while time.time() < end_time:
|
||||
table_name = f"mydb.tbl{randint(1, num_nodes)}"
|
||||
node = nodes[randint(0, num_nodes - 1)]
|
||||
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
|
||||
|
||||
def rename_table():
|
||||
def rename_tables():
|
||||
while time.time() < end_time:
|
||||
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
|
||||
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
|
||||
@ -215,7 +224,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
||||
f"RENAME TABLE {table_name1} TO {table_name2}"
|
||||
)
|
||||
|
||||
def make_backup():
|
||||
def truncate_tables():
|
||||
while time.time() < end_time:
|
||||
table_name = f"mydb.tbl{randint(1, num_nodes)}"
|
||||
node = nodes[randint(0, num_nodes - 1)]
|
||||
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY")
|
||||
|
||||
def make_backups():
|
||||
ids = []
|
||||
while time.time() < end_time:
|
||||
time.sleep(
|
||||
@ -231,11 +246,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
||||
ids = []
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
|
||||
futures = []
|
||||
ids_future = executor.submit(make_backup)
|
||||
ids_future = executor.submit(make_backups)
|
||||
futures.append(ids_future)
|
||||
futures.append(executor.submit(create_table))
|
||||
futures.append(executor.submit(drop_table))
|
||||
futures.append(executor.submit(rename_table))
|
||||
futures.append(executor.submit(create_tables))
|
||||
futures.append(executor.submit(drop_tables))
|
||||
futures.append(executor.submit(rename_tables))
|
||||
futures.append(executor.submit(truncate_tables))
|
||||
for future in futures:
|
||||
future.result()
|
||||
ids = ids_future.result()
|
||||
|
@ -0,0 +1 @@
|
||||
999
|
16
tests/queries/0_stateless/00488_column_name_primary.sql
Normal file
16
tests/queries/0_stateless/00488_column_name_primary.sql
Normal file
@ -0,0 +1,16 @@
|
||||
DROP TABLE IF EXISTS primary;
|
||||
|
||||
CREATE TABLE primary
|
||||
(
|
||||
`primary` String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY primary
|
||||
settings min_bytes_for_wide_part=0,min_bytes_for_wide_part=0
|
||||
AS
|
||||
SELECT *
|
||||
FROM numbers(1000);
|
||||
|
||||
select max(primary) from primary;
|
||||
|
||||
DROP TABLE primary;
|
@ -5,13 +5,15 @@ DROP TABLE IF EXISTS restore_01640;
|
||||
|
||||
CREATE TABLE test_01640(i Int64, d Date, s String)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
|
||||
PARTITION BY toYYYYMM(d) ORDER BY i;
|
||||
PARTITION BY toYYYYMM(d) ORDER BY i
|
||||
SETTINGS allow_remote_fs_zero_copy_replication=0;
|
||||
|
||||
insert into test_01640 values (1, '2021-01-01','some');
|
||||
|
||||
CREATE TABLE restore_01640(i Int64, d Date, s String)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/restore_01640','{replica}')
|
||||
PARTITION BY toYYYYMM(d) ORDER BY i;
|
||||
PARTITION BY toYYYYMM(d) ORDER BY i
|
||||
SETTINGS allow_remote_fs_zero_copy_replication=0;
|
||||
|
||||
ALTER TABLE restore_01640 FETCH PARTITION tuple(toYYYYMM(toDate('2021-01-01')))
|
||||
FROM '/clickhouse/{database}/{shard}/tables/test_01640';
|
||||
|
27
tests/queries/0_stateless/01825_type_json_17.reference
Normal file
27
tests/queries/0_stateless/01825_type_json_17.reference
Normal file
@ -0,0 +1,27 @@
|
||||
Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), id Int8)
|
||||
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}],"id":1}}
|
||||
{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}}
|
||||
[['bbb','']] [['aaa','ccc']]
|
||||
[['ddd','']] [['','']]
|
||||
1
|
||||
[[0,0]]
|
||||
[[10,20]]
|
||||
Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8)
|
||||
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}}
|
||||
{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}}
|
||||
[['aaa']] [[[]]]
|
||||
[['bbb','ccc']] [[[10],[20]]]
|
||||
1
|
||||
[[[]]]
|
||||
[[[10],[20]]]
|
||||
Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8)
|
||||
{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}}
|
||||
{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}}
|
||||
{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}}
|
||||
['qqq','www'] [[],[]] [[],[]]
|
||||
['eee'] [['aaa']] [[[]]]
|
||||
['rrr'] [['bbb','ccc']] [[[10],[20]]]
|
||||
1
|
||||
[[],[]]
|
||||
[[[]]]
|
||||
[[[10],[20]]]
|
48
tests/queries/0_stateless/01825_type_json_17.sql
Normal file
48
tests/queries/0_stateless/01825_type_json_17.sql
Normal file
@ -0,0 +1,48 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
DROP TABLE IF EXISTS t_json_17;
|
||||
SET allow_experimental_object_type = 1;
|
||||
SET output_format_json_named_tuples_as_objects = 1;
|
||||
|
||||
CREATE TABLE t_json_17(obj JSON)
|
||||
ENGINE = MergeTree ORDER BY tuple();
|
||||
|
||||
DROP FUNCTION IF EXISTS hasValidSizes17;
|
||||
CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2);
|
||||
|
||||
SYSTEM STOP MERGES t_json_17;
|
||||
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]}
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]}
|
||||
|
||||
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
|
||||
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
|
||||
SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id;
|
||||
SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17;
|
||||
SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id;
|
||||
|
||||
TRUNCATE TABLE t_json_17;
|
||||
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]}
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]}
|
||||
|
||||
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
|
||||
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
|
||||
SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
|
||||
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17;
|
||||
SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
|
||||
|
||||
TRUNCATE TABLE t_json_17;
|
||||
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]}
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]}
|
||||
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]}
|
||||
|
||||
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
|
||||
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
|
||||
SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
|
||||
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17;
|
||||
SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
|
||||
|
||||
DROP FUNCTION hasValidSizes17;
|
||||
DROP TABLE t_json_17;
|
Loading…
Reference in New Issue
Block a user