Merge branch 'master' into add_compatibility_check

This commit is contained in:
alesapin 2021-11-11 17:26:00 +03:00 committed by GitHub
commit 3a6857e106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 692 additions and 502 deletions

View File

@ -412,35 +412,35 @@ jobs:
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsAsan:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
- name: Check out repository code
uses: actions/checkout@v2
- name: Integration test
env:
TEMP_PATH: ${{runner.temp}}/integration_tests_asan
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Integration tests (asan, actions)'
REPO_COPY: ${{runner.temp}}/integration_tests_asan/ClickHouse
REQUIRED_BUILD_NUMBER: 3
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci
python3 integration_test_check.py "$CHECK_NAME" $REQUIRED_BUILD_NUMBER
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
# IntegrationTestsAsan:
# needs: [BuilderDebAsan]
# runs-on: [self-hosted, stress-tester]
# steps:
# - name: Download json reports
# uses: actions/download-artifact@v2
# with:
# path: ${{runner.temp}}/reports_dir
# - name: Check out repository code
# uses: actions/checkout@v2
# - name: Integration test
# env:
# TEMP_PATH: ${{runner.temp}}/integration_tests_asan
# REPORTS_PATH: ${{runner.temp}}/reports_dir
# CHECK_NAME: 'Integration tests (asan, actions)'
# REPO_COPY: ${{runner.temp}}/integration_tests_asan/ClickHouse
# REQUIRED_BUILD_NUMBER: 3
# run: |
# sudo rm -fr $TEMP_PATH
# mkdir -p $TEMP_PATH
# cp -r $GITHUB_WORKSPACE $TEMP_PATH
# cd $REPO_COPY/tests/ci
# python3 integration_test_check.py "$CHECK_NAME" $REQUIRED_BUILD_NUMBER
# - name: Cleanup
# if: always()
# run: |
# docker kill $(docker ps -q) ||:
# docker rm -f $(docker ps -a -q) ||:
# sudo rm -fr $TEMP_PATH
UnitTestsAsan:
needs: [BuilderDebAsan]
runs-on: [self-hosted, func-tester]
@ -572,7 +572,8 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
FinishCheck:
needs: [StyleCheck, DockerHubPush, CheckLabels, BuilderReport, FastTest, FunctionalStatelessTestDebug, FunctionalStatefulTestDebug, DocsCheck, StressTestDebug, ASTFuzzerTestDebug, IntegrationTestsAsan, PVSCheck, UnitTestsAsan, SplitBuildSmokeTest, CompatibilityCheck]
needs: [StyleCheck, DockerHubPush, CheckLabels, BuilderReport, FastTest, FunctionalStatelessTestDebug, FunctionalStatefulTestDebug, DocsCheck, StressTestDebug, ASTFuzzerTestDebug, PVSCheck, UnitTestsAsan, SplitBuildSmokeTest, CompatibilityCheck]
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code

View File

@ -5,7 +5,7 @@ toc_title: Bit
# Bit Functions {#bit-functions}
Bit functions work for any pair of types from UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, or Float64.
Bit functions work for any pair of types from `UInt8`, `UInt16`, `UInt32`, `UInt64`, `Int8`, `Int16`, `Int32`, `Int64`, `Float32`, or `Float64`. Some functions support `String` and `FixedString` types.
The result type is an integer with bits equal to the maximum bits of its arguments. If at least one of the arguments is signed, the result is a signed number. If an argument is a floating-point number, it is cast to Int64.
@ -19,8 +19,100 @@ The result type is an integer with bits equal to the maximum bits of its argumen
## bitShiftLeft(a, b) {#bitshiftlefta-b}
Shifts the binary representation of a value to the left by a specified number of bit positions.
A `FixedString` or a `String` is treated as a single multibyte value.
Bits of a `FixedString` value are lost as they are shifted out. On the contrary, a `String` value is extended with additional bytes, so no bits are lost.
**Syntax**
``` sql
bitShiftLeft(a, b)
```
**Arguments**
- `a` — A value to shift. [Integer types](../../sql-reference/data-types/int-uint.md), [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `b` — The number of shift positions. [Unsigned integer types](../../sql-reference/data-types/int-uint.md), 64 bit types or less are allowed.
**Returned value**
- Shifted value.
The type of the returned value is the same as the type of the input value.
**Example**
In the following queries [bin](encoding-functions.md#bin) and [hex](encoding-functions.md#hex) functions are used to show bits of shifted values.
``` sql
SELECT 99 AS a, bin(a), bitShiftLeft(a, 2) AS a_shifted, bin(a_shifted);
SELECT 'abc' AS a, hex(a), bitShiftLeft(a, 4) AS a_shifted, hex(a_shifted);
SELECT toFixedString('abc', 3) AS a, hex(a), bitShiftLeft(a, 4) AS a_shifted, hex(a_shifted);
```
Result:
``` text
┌──a─┬─bin(99)──┬─a_shifted─┬─bin(bitShiftLeft(99, 2))─┐
│ 99 │ 01100011 │ 140 │ 10001100 │
└────┴──────────┴───────────┴──────────────────────────┘
┌─a───┬─hex('abc')─┬─a_shifted─┬─hex(bitShiftLeft('abc', 4))─┐
│ abc │ 616263 │ &0 │ 06162630 │
└─────┴────────────┴───────────┴─────────────────────────────┘
┌─a───┬─hex(toFixedString('abc', 3))─┬─a_shifted─┬─hex(bitShiftLeft(toFixedString('abc', 3), 4))─┐
│ abc │ 616263 │ &0 │ 162630 │
└─────┴──────────────────────────────┴───────────┴───────────────────────────────────────────────┘
```
## bitShiftRight(a, b) {#bitshiftrighta-b}
Shifts the binary representation of a value to the right by a specified number of bit positions.
A `FixedString` or a `String` is treated as a single multibyte value. Note that the length of a `String` value is reduced as bits are shifted out.
**Syntax**
``` sql
bitShiftRight(a, b)
```
**Arguments**
- `a` — A value to shift. [Integer types](../../sql-reference/data-types/int-uint.md), [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `b` — The number of shift positions. [Unsigned integer types](../../sql-reference/data-types/int-uint.md), 64 bit types or less are allowed.
**Returned value**
- Shifted value.
The type of the returned value is the same as the type of the input value.
**Example**
Query:
``` sql
SELECT 101 AS a, bin(a), bitShiftRight(a, 2) AS a_shifted, bin(a_shifted);
SELECT 'abc' AS a, hex(a), bitShiftRight(a, 12) AS a_shifted, hex(a_shifted);
SELECT toFixedString('abc', 3) AS a, hex(a), bitShiftRight(a, 12) AS a_shifted, hex(a_shifted);
```
Result:
``` text
┌───a─┬─bin(101)─┬─a_shifted─┬─bin(bitShiftRight(101, 2))─┐
│ 101 │ 01100101 │ 25 │ 00011001 │
└─────┴──────────┴───────────┴────────────────────────────┘
┌─a───┬─hex('abc')─┬─a_shifted─┬─hex(bitShiftRight('abc', 12))─┐
│ abc │ 616263 │ │ 0616 │
└─────┴────────────┴───────────┴───────────────────────────────┘
┌─a───┬─hex(toFixedString('abc', 3))─┬─a_shifted─┬─hex(bitShiftRight(toFixedString('abc', 3), 12))─┐
│ abc │ 616263 │ │ 000616 │
└─────┴──────────────────────────────┴───────────┴─────────────────────────────────────────────────┘
```
## bitRotateLeft(a, b) {#bitrotatelefta-b}
## bitRotateRight(a, b) {#bitrotaterighta-b}

View File

@ -5,7 +5,7 @@ toc_title: "Битовые функции"
# Битовые функции {#bitovye-funktsii}
Битовые функции работают для любой пары типов из UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.
Битовые функции работают для любой пары типов из `UInt8`, `UInt16`, `UInt32`, `UInt64`, `Int8`, `Int16`, `Int32`, `Int64`, `Float32`, `Float64`.
Тип результата - целое число, битность которого равна максимальной битности аргументов. Если хотя бы один аргумент знаковый, то результат - знаковое число. Если аргумент - число с плавающей запятой - оно приводится к Int64.
@ -19,8 +19,100 @@ toc_title: "Битовые функции"
## bitShiftLeft(a, b) {#bitshiftlefta-b}
Сдвигает влево на заданное количество битов бинарное представление значения.
Если передан аргумент типа `FixedString` или `String`, то он рассматривается, как одно многобайтовое значение.
Биты `FixedString` теряются по мере того, как выдвигаются за пределы строки. Значение `String` дополняется байтами, поэтому его биты не теряются.
**Синтаксис**
``` sql
bitShiftLeft(a, b)
```
**Аргументы**
- `a` — сдвигаемое значение. [Целое число](../../sql-reference/data-types/int-uint.md), [String](../../sql-reference/data-types/string.md) или [FixedString](../../sql-reference/data-types/fixedstring.md).
- `b` — величина сдвига. [Беззнаковое целое число](../../sql-reference/data-types/int-uint.md), допустимы типы с разрядностью не более 64 битов.
**Возвращаемое значение**
- Сдвинутое значение.
Тип совпадает с типом сдвигаемого значения.
**Пример**
В запросах используются функции [bin](encoding-functions.md#bin) и [hex](encoding-functions.md#hex), чтобы наглядно показать биты после сдвига.
``` sql
SELECT 99 AS a, bin(a), bitShiftLeft(a, 2) AS a_shifted, bin(a_shifted);
SELECT 'abc' AS a, hex(a), bitShiftLeft(a, 4) AS a_shifted, hex(a_shifted);
SELECT toFixedString('abc', 3) AS a, hex(a), bitShiftLeft(a, 4) AS a_shifted, hex(a_shifted);
```
Результат:
``` text
┌──a─┬─bin(99)──┬─a_shifted─┬─bin(bitShiftLeft(99, 2))─┐
│ 99 │ 01100011 │ 140 │ 10001100 │
└────┴──────────┴───────────┴──────────────────────────┘
┌─a───┬─hex('abc')─┬─a_shifted─┬─hex(bitShiftLeft('abc', 4))─┐
│ abc │ 616263 │ &0 │ 06162630 │
└─────┴────────────┴───────────┴─────────────────────────────┘
┌─a───┬─hex(toFixedString('abc', 3))─┬─a_shifted─┬─hex(bitShiftLeft(toFixedString('abc', 3), 4))─┐
│ abc │ 616263 │ &0 │ 162630 │
└─────┴──────────────────────────────┴───────────┴───────────────────────────────────────────────┘
```
## bitShiftRight(a, b) {#bitshiftrighta-b}
Сдвигает вправо на заданное количество битов бинарное представление значения.
Если передан аргумент типа `FixedString` или `String`, то он рассматривается, как одно многобайтовое значение. Длина значения типа `String` уменьшается по мере сдвига.
**Синтаксис**
``` sql
bitShiftRight(a, b)
```
**Аргументы**
- `a` — сдвигаемое значение. [Целое число](../../sql-reference/data-types/int-uint.md), [String](../../sql-reference/data-types/string.md) или [FixedString](../../sql-reference/data-types/fixedstring.md).
- `b` — величина сдвига. [Беззнаковое целое число](../../sql-reference/data-types/int-uint.md), допустимы типы с разрядностью не более 64 битов.
**Возвращаемое значение**
- Сдвинутое значение.
Тип совпадает с типом сдвигаемого значения.
**Пример**
Запрос:
``` sql
SELECT 101 AS a, bin(a), bitShiftRight(a, 2) AS a_shifted, bin(a_shifted);
SELECT 'abc' AS a, hex(a), bitShiftRight(a, 12) AS a_shifted, hex(a_shifted);
SELECT toFixedString('abc', 3) AS a, hex(a), bitShiftRight(a, 12) AS a_shifted, hex(a_shifted);
```
Результат:
``` text
┌───a─┬─bin(101)─┬─a_shifted─┬─bin(bitShiftRight(101, 2))─┐
│ 101 │ 01100101 │ 25 │ 00011001 │
└─────┴──────────┴───────────┴────────────────────────────┘
┌─a───┬─hex('abc')─┬─a_shifted─┬─hex(bitShiftRight('abc', 12))─┐
│ abc │ 616263 │ │ 0616 │
└─────┴────────────┴───────────┴───────────────────────────────┘
┌─a───┬─hex(toFixedString('abc', 3))─┬─a_shifted─┬─hex(bitShiftRight(toFixedString('abc', 3), 12))─┐
│ abc │ 616263 │ │ 000616 │
└─────┴──────────────────────────────┴───────────┴─────────────────────────────────────────────────┘
```
## bitTest {#bittest}
Принимает любое целое число и конвертирует его в [двоичное число](https://en.wikipedia.org/wiki/Binary_number), возвращает значение бита в указанной позиции. Отсчет начинается с 0 справа налево.

View File

@ -1,82 +1,74 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 54
toc_title: "\u6D4B\u8BD5\u786C\u4EF6"
---
# 如何使用ClickHouse测试您的硬件 {#how-to-test-your-hardware-with-clickhouse}
# 如何使用 ClickHouse 测试您的硬件 {#how-to-test-your-hardware-with-clickhouse}
使用此指令您可以在任何服务器上运行基本的ClickHouse性能测试而无需安装ClickHouse软件包。
你可以在任何服务器上运行基本的 ClickHouse 性能测试,而无需安装 ClickHouse 软件包。
1. 转到 “commits” 页数https://github.com/ClickHouse/ClickHouse/commits/master
2. 点击第一个绿色复选标记或红色十字与绿色 “ClickHouse Build Check” 然后点击 “Details” 附近链接 “ClickHouse Build Check”. 在一些提交中没有这样的链接,例如与文档的提交。 在这种情况下,请选择具有此链接的最近提交。
## 自动运行
3. 将链接复制到 “clickhouse” 二进制为amd64或aarch64.
你可以使用一个简单脚本来运行基准测试。
4. ssh到服务器并使用wget下载它:
1. 下载脚本
```
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/hardware.sh
```
<!-- -->
2. 运行脚本
```
chmod a+x ./hardware.sh
./hardware.sh
```
# For amd64:
wget https://clickhouse-builds.s3.yandex.net/0/00ba767f5d2a929394ea3be193b1f79074a1c4bc/1578163263_binary/clickhouse
# For aarch64:
wget https://clickhouse-builds.s3.yandex.net/0/00ba767f5d2a929394ea3be193b1f79074a1c4bc/1578161264_binary/clickhouse
# Then do:
chmod a+x clickhouse
3. 复制输出的信息并将它发送给 feedback@clickhouse.com
1. 下载配置:
所有的结果都在这里公布: https://clickhouse.com/benchmark/hardware/
<!-- -->
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/config.xml
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/users.xml
mkdir config.d
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/config.d/path.xml -O config.d/path.xml
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/config.d/log_to_console.xml -O config.d/log_to_console.xml
## 人工运行
1. 下载基准测试文件:
或者,你可以按照以下步骤实施基准测试。
```bash
# For amd64:
wget https://builds.clickhouse.com/master/amd64/clickhouse
# For aarch64:
wget https://builds.clickhouse.com/master/aarch64/clickhouse
# Then do:
chmod a+x clickhouse
```
<!-- -->
2. 下载基准文件
```bash
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/benchmark-new.sh
chmod a+x benchmark-new.sh
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/queries.sql
```
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/benchmark-new.sh
chmod a+x benchmark-new.sh
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/queries.sql
3. 根据 [Yandex.Metrica 数据集](../getting-started/example-datasets/metrica.md) 中的说明下载测试数据(“ hits ” 数据表包含 1 亿行记录)。
```bash
wget https://datasets.clickhouse.com/hits/partitions/hits_100m_obfuscated_v1.tar.xz
tar xvf hits_100m_obfuscated_v1.tar.xz -C .
mv hits_100m_obfuscated_v1/* .
```
1. 根据下载测试数据 [Yandex梅里卡数据集](../getting-started/example-datasets/metrica.md) 说明 (“hits” 表包含100万行
4. 运行服务器:
```bash
./clickhouse server
```
<!-- -->
5. 检查数据:在另一个终端中通过 ssh 登陆服务器
```bash
./clickhouse client --query "SELECT count() FROM hits_100m_obfuscated"
100000000
```
6. 运行基准测试:
```bash
./benchmark-new.sh hits_100m_obfuscated
```
wget https://datasets.clickhouse.com/hits/partitions/hits_100m_obfuscated_v1.tar.xz
tar xvf hits_100m_obfuscated_v1.tar.xz -C .
mv hits_100m_obfuscated_v1/* .
7. 将有关硬件配置的型号和信息发送到 clickhouse-feedback@yandex-team.com
1. 运行服务器:
<!-- -->
./clickhouse server
1. 检查数据ssh到另一个终端中的服务器
<!-- -->
./clickhouse client --query "SELECT count() FROM hits_100m_obfuscated"
100000000
1. 编辑benchmark-new.sh改变 `clickhouse-client``./clickhouse client` 并添加 `-max_memory_usage 100000000000` 参数。
<!-- -->
mcedit benchmark-new.sh
1. 运行基准测试:
<!-- -->
./benchmark-new.sh hits_100m_obfuscated
1. 将有关硬件配置的编号和信息发送到clickhouse-feedback@yandex-team.com
所有结果都在这里公布https://clickhouse.技术/基准/硬件/
所有结果都在这里公布https://clickhouse.com/benchmark/hardware/

View File

@ -1,6 +1,4 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 58
toc_title: "\u67E5\u8BE2\u6743\u9650"
---

View File

@ -1,32 +1,30 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 61
toc_title: "\u8BBE\u7F6E\u914D\u7F6E\u6587\u4EF6"
toc_title: "\u8BBE\u7F6E\u914D\u7F6E"
---
# 设置配置文件 {#settings-profiles}
# 设置配置 {#settings-profiles}
设置配置文件是以相同名称分组的设置的集合
设置配置是设置的集合,并按照相同的名称进行分组
!!! note "信息"
ClickHouse还支持 [SQL驱动的工作流](../access-rights.md#access-control) 用于管理设置配置文件 我们建议使用它。
ClickHouse 还支持 [SQL驱动的工作流](../../operations/access-rights.md#access-control) 管理设置配置。我们建议使用它。
配置文件可以有任何名称。 配置文件可以有任何名称。 您可以为不同的用户指定相同的配置文件。 您可以在设置配置文件中编写的最重要的事情是 `readonly=1`,这确保只读访问。
设置配置可以任意命名。你可以为不同的用户指定相同的设置配置。您可以在设置配置中写入的最重要的内容是 `readonly=1`,这将确保只读访问。
设置配置文件可以彼此继承。 要使用继承,请指示一个或多个 `profile` 配置文件中列出的其他设置之前的设置。 如果在不同的配置文件中定义了一个设置,则使用最新定义。
设置配置可以彼此继承。要使用继承,请在文件中列举的其他设置之前,指定一个或多个 `profile` 设置。如果在不同的设置配置中定义了同一个设置,则使用最新的定义。
要应用配置文件中的所有设置,请设置 `profile` 设置。
要应用设置配置中的所有设置,请设定 `profile` 设置。
示例:
安装 `web` 侧写
添加 `web` 配置。
``` sql
SET profile = 'web'
```
设置配置文件在用户配置文件中声明。 这通常是 `users.xml`.
设置配置在用户配置文件中声明。这通常是 `users.xml`.
示例:
@ -72,10 +70,10 @@ SET profile = 'web'
</profiles>
```
该示例指定了两个配置文件: `default``web`.
这个示例指定了两个配置: `default``web`
`default` 配置文件有一个特殊用途:它必须始终存在并在启动服务时应用。 换句话说, `default` 配置文件包含默认设置。
这个 `default` 配置有一个特殊用途:它必须始终存在并在启动服务时应用。换句话说, `default` 配置包含默认设置。
`web` 配置文件是一个常规的配置文件,可以使用设置 `SET` 查询或在HTTP查询中使用URL参数
`web` 配置是一个常规的配置,它可以通过 `SET` 查询进行设定也可以通过在HTTP查询中使用URL参数进行设定
[原始文章](https://clickhouse.com/docs/en/operations/settings/settings_profiles/) <!--hide-->

View File

@ -80,7 +80,6 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Add default database to table identifiers that we can encounter in e.g. default expressions, mutation expression, etc.
@ -160,6 +159,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (!alter_commands.empty())
{
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, getContext());
alter_commands.prepare(metadata);

View File

@ -2191,12 +2191,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
renameTempPartAndReplace(part_desc->res_part, nullptr, &transaction);
getCommitPartOps(ops, part_desc->res_part);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
zookeeper->multi(ops);
ops.clear();
}
}
if (!ops.empty())
@ -6257,186 +6251,195 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
auto zookeeper = getZooKeeper();
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
if (replace && partition_was_empty)
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
{
/// Nothing to drop, will just attach new parts
LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id);
replace = false;
}
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
if (!replace)
{
/// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range
drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
}
assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
for (const auto & src_part : src_all_parts)
{
/// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
if (!canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
if (replace)
LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex);
else
LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex);
String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex));
auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
if (replace && partition_was_empty)
{
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
/// Nothing to drop, will just attach new parts
LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id);
replace = false;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
/// Remove deduplication block_ids of replacing parts
if (replace)
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
if (!replace)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
/// It is unnecessary to add parts to working set until we commit log entry
zookeeper->multi(ops);
ops.clear();
}
/// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range
drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));
delimiting_block_lock->getUnlockOps(ops);
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
Transaction transaction(*this);
for (const auto & src_part : src_all_parts)
{
auto data_parts_lock = lockParts();
/// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
}
if (!canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, op_results);
String hash_hex = src_part->checksums.getTotalChecksumHex();
{
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock);
LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex);
else
LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex);
String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex));
auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
ReplicatedMergeTreeLogEntryData entry;
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
/// Remove deduplication block_ids of replacing parts
if (replace)
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOps(ops);
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this);
{
auto data_parts_lock = lockParts();
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
}
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
{
/// Cannot retry automatically, because some zookeeper ops were lost on the first attempt. Will retry on DDLWorker-level.
if (query_context->getZooKeeperMetadataTransaction())
throw Exception(
"Cannot execute alter, because alter partition version was suddenly changed due to concurrent alter",
ErrorCodes::CANNOT_ASSIGN_ALTER);
continue;
}
else
zkutil::KeeperMultiException::check(code, ops, op_results);
{
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
throw Exception(
ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
}
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context)
@ -6464,199 +6467,201 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
/// A range for log entry to remove parts from the source table (myself).
auto zookeeper = getZooKeeper();
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
DataPartPtr covering_part;
DataPartsVector src_all_parts;
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
{
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
auto parts_lock = src_data.lockParts();
src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock);
}
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
if (covering_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug",
covering_part->name, drop_range_fake_part_name);
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
/// After allocating block number for drop_range we must ensure that it does not intersect block numbers
/// allocated by concurrent REPLACE query.
/// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log,
/// but it's better to check it here and fail as early as possible (before we have done something to destination table).
Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version);
if (version_check_code != Coordination::Error::ZOK)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, "
"because another ALTER PARTITION query was concurrently executed",
getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName());
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_move_from_";
/// Clone parts into destination table.
String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version";
Coordination::Stat dest_alter_partition_version_stat;
zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat);
for (const auto & src_part : src_all_parts)
{
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
DataPartPtr covering_part;
DataPartsVector src_all_parts;
{
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
auto parts_lock = src_data.lockParts();
src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock);
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);
if (covering_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug",
covering_part->name, drop_range_fake_part_name);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
/// After allocating block number for drop_range we must ensure that it does not intersect block numbers
/// allocated by concurrent REPLACE query.
/// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log,
/// but it's better to check it here and fail as early as possible (before we have done something to destination table).
Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version);
if (version_check_code != Coordination::Error::ZOK)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, "
"because another ALTER PARTITION query was concurrently executed",
getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName());
ReplicatedMergeTreeLogEntryData entry_delete;
{
entry_delete.type = LogEntry::DROP_RANGE;
entry_delete.source_replica = replica_name;
entry_delete.new_part_name = drop_range_fake_part_name;
entry_delete.detach = false; //-V1048
entry_delete.create_time = time(nullptr);
}
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
ReplicatedMergeTreeLogEntryData entry;
{
MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = dest_table_storage->replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
static const String TMP_PREFIX = "tmp_move_from_";
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest);
entry_replace.from_database = src_data_id.database_name;
entry_replace.from_table = src_data_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
/// Clone parts into destination table.
String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version";
Coordination::Stat dest_alter_partition_version_stat;
zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat);
for (const auto & src_part : src_all_parts)
{
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{
zookeeper->multi(ops);
ops.clear();
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-",
entry.toString(), zkutil::CreateMode::PersistentSequential));
ReplicatedMergeTreeLogEntryData entry_delete;
{
Transaction transaction(*dest_table_storage);
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
std::mutex mutex;
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock);
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock);
transaction.commit(&lock);
entry_delete.type = LogEntry::DROP_RANGE;
entry_delete.source_replica = replica_name;
entry_delete.new_part_name = drop_range_fake_part_name;
entry_delete.detach = false; //-V1048
entry_delete.create_time = time(nullptr);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
ReplicatedMergeTreeLogEntryData entry;
{
MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = dest_table_storage->replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest);
entry_replace.from_database = src_data_id.database_name;
entry_replace.from_table = src_data_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
}
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-",
entry.toString(), zkutil::CreateMode::PersistentSequential));
{
Transaction transaction(*dest_table_storage);
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
std::mutex mutex;
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock);
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZBADVERSION)
continue;
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock);
transaction.commit(&lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table
Coordination::Requests ops_src;
ops_src.emplace_back(zkutil::makeCreateRequest(
fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
/// Just update version, because merges assignment relies on it
ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops_src);
op_results = zookeeper->multi(ops_src);
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);
return;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table
Coordination::Requests ops_src;
ops_src.emplace_back(zkutil::makeCreateRequest(
fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
/// Just update version, because merges assignment relies on it
ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops_src);
op_results = zookeeper->multi(ops_src);
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
}
void StorageReplicatedMergeTree::movePartitionToShard(
@ -6984,68 +6989,80 @@ bool StorageReplicatedMergeTree::dropPartImpl(
bool StorageReplicatedMergeTree::dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach)
{
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range_info;
/// It would prevent other replicas from assigning merges which intersect locked block number.
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
{
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id);
return false;
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range_info;
/// It would prevent other replicas from assigning merges which intersect locked block number.
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
{
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id);
return false;
}
clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = drop_range_fake_part_name;
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Check and update version to avoid race with REPLACE_RANGE.
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
/// as a result of execution of concurrently created REPLACE_RANGE entry.
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops);
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses;
Coordination::Error code = zookeeper.tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
{
/// Cannot retry automatically, because some zookeeper ops were lost on the first attempt. Will retry on DDLWorker-level.
if (query_context->getZooKeeperMetadataTransaction())
throw Exception(
"Cannot execute alter, because alter partition version was suddenly changed due to concurrent alter",
ErrorCodes::CANNOT_ASSIGN_ALTER);
continue;
}
else
zkutil::KeeperMultiException::check(code, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block);
return true;
}
clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = drop_range_fake_part_name;
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Check and update version to avoid race with REPLACE_RANGE.
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
/// as a result of execution of concurrently created REPLACE_RANGE entry.
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops);
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses;
Coordination::Error code = zookeeper.tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER,
"Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block);
return true;
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER,
"Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed");
}