Merge pull request #61987 from allmazz/feat/59376

support ATTACH PARTITION `ALL` FROM `TABLE`
This commit is contained in:
Alexander Tokmakov 2024-09-05 14:57:38 +00:00 committed by GitHub
commit b6572e36b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 131 additions and 42 deletions

View File

@ -351,7 +351,7 @@ ALTER TABLE mt DELETE IN PARTITION ID '2' WHERE p = 2;
You can specify the partition expression in `ALTER ... PARTITION` queries in different ways:
- As a value from the `partition` column of the `system.parts` table. For example, `ALTER TABLE visits DETACH PARTITION 201901`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH/ATTACH FROM. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- As a tuple of expressions or constants that matches (in types) the table partitioning keys tuple. In the case of a single element partitioning key, the expression should be wrapped in the `tuple (...)` function. For example, `ALTER TABLE visits DETACH PARTITION tuple(toYYYYMM(toDate('2019-01-25')))`.
- Using the partition ID. Partition ID is a string identifier of the partition (human-readable, if possible) that is used as the names of partitions in the file system and in ZooKeeper. The partition ID must be specified in the `PARTITION ID` clause, in a single quotes. For example, `ALTER TABLE visits DETACH PARTITION ID '201901'`.
- In the [ALTER ATTACH PART](#attach-partitionpart) and [DROP DETACHED PART](#drop-detached-partitionpart) query, to specify the name of a part, use string literal with a value from the `name` column of the [system.detached_parts](/docs/en/operations/system-tables/detached_parts.md/#system_tables-detached_parts) table. For example, `ALTER TABLE visits ATTACH PART '201901_1_1_0'`.

View File

@ -5009,7 +5009,7 @@ void MergeTreeData::checkAlterPartitionIsPossible(
const auto * partition_ast = command.partition->as<ASTPartition>();
if (partition_ast && partition_ast->all)
{
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION)
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace))
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
}
else
@ -5810,7 +5810,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
const auto & partition_ast = ast->as<ASTPartition &>();
if (partition_ast.all)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DETACH PARTITION ALL currently");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DROP/DETACH/ATTACH PARTITION ALL currently");
if (!partition_ast.value)
{

View File

@ -2090,9 +2090,22 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts;
String partition_id;
bool is_all = partition->as<ASTPartition>()->all;
if (is_all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
src_parts = src_data.getVisibleDataPartsVector(local_context);
}
else
{
partition_id = getPartitionIDFromQuery(partition, local_context);
src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
@ -2100,6 +2113,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
for (const DataPartPtr & src_part : src_parts)
{
if (is_all)
partition_id = src_part->partition.getID(src_data);
if (!canReplacePartition(src_part))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot replace partition '{}' because part '{}' has inconsistent granularity with table",

View File

@ -8033,24 +8033,77 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto storage_settings_ptr = getSettings();
const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
const auto metadata_snapshot = getInMemoryMetadataPtr();
const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
Stopwatch watch;
std::unordered_set<String> partitions;
if (partition->as<ASTPartition>()->all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
partitions = src_data.getAllPartitionIds();
}
else
{
partitions = std::unordered_set<String>();
partitions.emplace(getPartitionIDFromQuery(partition, query_context));
}
LOG_INFO(log, "Will try to attach {} partitions", partitions.size());
const Stopwatch watch;
ProfileEventsScope profile_events_scope;
const auto zookeeper = getZooKeeper();
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, query_context);
const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
std::unique_ptr<ReplicatedMergeTreeLogEntryData> entries[partitions.size()];
size_t idx = 0;
for (const auto & partition_id : partitions)
{
entries[idx] = replacePartitionFromImpl(watch,
profile_events_scope,
metadata_snapshot,
src_data,
partition_id,
zookeeper,
replace,
zero_copy_enabled,
storage_settings_ptr->always_use_copy_instead_of_hardlinks,
query_context);
++idx;
}
for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
}
std::unique_ptr<ReplicatedMergeTreeLogEntryData> StorageReplicatedMergeTree::replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context)
{
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
std::optional<ZooKeeperMetadataTransaction> txn;
if (auto query_txn = query_context->getZooKeeperMetadataTransaction())
txn.emplace(query_txn->getZooKeeper(),
query_txn->getDatabaseZooKeeperPath(),
query_txn->isInitialQuery(),
query_txn->getTaskZooKeeperPath());
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
@ -8136,11 +8189,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
if (replace)
@ -8148,7 +8199,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Replace can only work on the same disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8163,7 +8214,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Attach can work on another disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8179,15 +8230,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
auto entry = std::make_unique<ReplicatedMergeTreeLogEntryData>();
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry->source_replica = replica_name;
entry->create_time = time(nullptr);
entry->replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
auto & entry_replace = *entry->replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
@ -8220,7 +8271,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ephemeral_locks[i].getUnlockOp(ops);
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
if (txn)
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOp(ops);
@ -8228,7 +8279,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this, NO_TRANSACTION_RAW);
{
@ -8278,14 +8329,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
lock2.reset();
lock1.reset();
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
@ -8294,10 +8342,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
return entry;
}
throw Exception(

View File

@ -37,6 +37,7 @@
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Parsers/SyncReplicaMode.h>
@ -1013,6 +1014,18 @@ private:
DataPartsVector::const_iterator it;
};
const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
std::unique_ptr<ReplicatedMergeTreeLogEntryData> replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);

View File

@ -10,11 +10,12 @@ REPLACE recursive
4 8
1
ATTACH FROM
5 8
6 8
10 12
OPTIMIZE
5 8 5
5 8 3
10 12 9
10 12 5
After restart
5 8
10 12
DETACH+ATTACH PARTITION
3 4
7 7

View File

@ -53,12 +53,16 @@ DROP TABLE src;
CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO src VALUES (1, '0', 1);
INSERT INTO src VALUES (1, '1', 1);
INSERT INTO src VALUES (2, '2', 1);
INSERT INTO src VALUES (3, '3', 1);
SYSTEM STOP MERGES dst;
INSERT INTO dst VALUES (1, '1', 2);
INSERT INTO dst VALUES (1, '1', 2), (1, '2', 0);
ALTER TABLE dst ATTACH PARTITION 1 FROM src;
SELECT count(), sum(d) FROM dst;
ALTER TABLE dst ATTACH PARTITION ALL FROM src;
SELECT count(), sum(d) FROM dst;
SELECT 'OPTIMIZE';
SELECT count(), sum(d), uniqExact(_part) FROM dst;

View File

@ -16,6 +16,7 @@ REPLACE recursive
ATTACH FROM
5 8
5 8
7 12
REPLACE with fetch
4 6
4 6

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-object-storage
# Tags: zookeeper, no-object-storage, long
# Because REPLACE PARTITION does not forces immediate removal of replaced data parts from local filesystem
# (it tries to do it as quick as possible, but it still performed in separate thread asynchronously)
@ -82,6 +82,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE src;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (3, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (4, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO dst_r2 VALUES (1, '1', 2);"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION 1 FROM src;"
@ -90,6 +92,13 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION ALL FROM src;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 3;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 4;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch';"
$CLICKHOUSE_CLIENT --query="DROP TABLE src;"