Add retry to check alter_partition_version

This commit is contained in:
Amos Bird 2021-11-03 15:46:33 +08:00
parent b25b778349
commit 710f7d64e4
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4

View File

@ -6257,186 +6257,195 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
auto zookeeper = getZooKeeper();
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_"; static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; /// Retry if alter_partition_version changes
Coordination::Stat alter_partition_version_stat; for (size_t retry = 0; retry < 1000; ++retry)
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
if (replace && partition_was_empty)
{ {
/// Nothing to drop, will just attach new parts DataPartsVector src_parts;
LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id); MutableDataPartsVector dst_parts;
replace = false; Strings block_id_paths;
} Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
if (!replace) /// Firstly, generate last block number and compute drop_range
{ /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
} MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
if (replace && partition_was_empty)
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
for (const auto & src_part : src_all_parts)
{
/// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
if (!canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
if (replace)
LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex);
else
LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex);
String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex));
auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{ {
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); /// Nothing to drop, will just attach new parts
continue; LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id);
replace = false;
} }
UInt64 index = lock->getNumber(); if (!replace)
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
/// Remove deduplication block_ids of replacing parts
if (replace)
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{ {
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range
ephemeral_locks[i].getUnlockOps(ops); drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
/// It is unnecessary to add parts to working set until we commit log entry
zookeeper->multi(ops);
ops.clear();
}
} }
if (auto txn = query_context->getZooKeeperMetadataTransaction()) assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOps(ops); String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this); for (const auto & src_part : src_all_parts)
{ {
auto data_parts_lock = lockParts(); /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
for (MutableDataPartPtr & part : dst_parts) if (!canReplacePartition(src_part))
renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock); throw Exception(
} "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
Coordination::Error code = zookeeper->tryMulti(ops, op_results); String hash_hex = src_part->checksums.getTotalChecksumHex();
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, op_results);
{
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace) if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock); LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex);
else
LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex);
String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex));
auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
} }
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); ReplicatedMergeTreeLogEntryData entry;
} {
catch (...) auto src_table_id = src_data.getStorageID();
{ entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); entry.source_replica = replica_name;
throw; entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
/// Remove deduplication block_ids of replacing parts
if (replace)
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
/// It is unnecessary to add parts to working set until we commit log entry
zookeeper->multi(ops);
ops.clear();
}
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOps(ops);
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this);
{
auto data_parts_lock = lockParts();
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
}
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
continue;
else
zkutil::KeeperMultiException::check(code, ops, op_results);
{
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
} }
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created; throw Exception(
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
} }
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context)
@ -6464,199 +6473,207 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
/// A range for log entry to remove parts from the source table (myself). /// A range for log entry to remove parts from the source table (myself).
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; /// Retry if alter_partition_version changes
Coordination::Stat alter_partition_version_stat; for (size_t retry = 0; retry < 1000; ++retry)
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
DataPartPtr covering_part;
DataPartsVector src_all_parts;
{ {
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
auto parts_lock = src_data.lockParts(); Coordination::Stat alter_partition_version_stat;
src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock); zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
}
if (covering_part) MergeTreePartInfo drop_range;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug", std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
covering_part->name, drop_range_fake_part_name); getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
/// After allocating block number for drop_range we must ensure that it does not intersect block numbers DataPartPtr covering_part;
/// allocated by concurrent REPLACE query. DataPartsVector src_all_parts;
/// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log,
/// but it's better to check it here and fail as early as possible (before we have done something to destination table).
Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version);
if (version_check_code != Coordination::Error::ZOK)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, "
"because another ALTER PARTITION query was concurrently executed",
getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName());
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_move_from_";
/// Clone parts into destination table.
String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version";
Coordination::Stat dest_alter_partition_version_stat;
zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat);
for (const auto & src_part : src_all_parts)
{
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{ {
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
continue; auto parts_lock = src_data.lockParts();
src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock);
} }
UInt64 index = lock->getNumber(); if (covering_part)
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug",
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot); covering_part->name, drop_range_fake_part_name);
src_parts.emplace_back(src_part); /// After allocating block number for drop_range we must ensure that it does not intersect block numbers
dst_parts.emplace_back(dst_part); /// allocated by concurrent REPLACE query.
ephemeral_locks.emplace_back(std::move(*lock)); /// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log,
block_id_paths.emplace_back(block_id_path); /// but it's better to check it here and fail as early as possible (before we have done something to destination table).
part_checksums.emplace_back(hash_hex); Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version);
} if (version_check_code != Coordination::Error::ZOK)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, "
"because another ALTER PARTITION query was concurrently executed",
getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName());
ReplicatedMergeTreeLogEntryData entry_delete; DataPartsVector src_parts;
{ MutableDataPartsVector dst_parts;
entry_delete.type = LogEntry::DROP_RANGE; Strings block_id_paths;
entry_delete.source_replica = replica_name; Strings part_checksums;
entry_delete.new_part_name = drop_range_fake_part_name; std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
entry_delete.detach = false; //-V1048
entry_delete.create_time = time(nullptr);
}
ReplicatedMergeTreeLogEntryData entry; LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
{
MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; static const String TMP_PREFIX = "tmp_move_from_";
entry.source_replica = dest_table_storage->replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry; /// Clone parts into destination table.
entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest); String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version";
entry_replace.from_database = src_data_id.database_name; Coordination::Stat dest_alter_partition_version_stat;
entry_replace.from_table = src_data_id.table_name; zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat);
for (const auto & part : src_parts) for (const auto & src_part : src_all_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{ {
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); if (!dest_table_storage->canReplacePartition(src_part))
ephemeral_locks[i].getUnlockOps(ops); throw Exception(
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
if (ops.size() > zkutil::MULTI_BATCH_SIZE) String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{ {
zookeeper->multi(ops); LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
ops.clear(); continue;
} }
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
} }
/// Check and update version to avoid race with DROP_RANGE ReplicatedMergeTreeLogEntryData entry_delete;
ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-",
entry.toString(), zkutil::CreateMode::PersistentSequential));
{ {
Transaction transaction(*dest_table_storage); entry_delete.type = LogEntry::DROP_RANGE;
entry_delete.source_replica = replica_name;
auto src_data_parts_lock = lockParts(); entry_delete.new_part_name = drop_range_fake_part_name;
auto dest_data_parts_lock = dest_table_storage->lockParts(); entry_delete.detach = false; //-V1048
entry_delete.create_time = time(nullptr);
std::mutex mutex;
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock);
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock);
transaction.commit(&lock);
} }
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); ReplicatedMergeTreeLogEntryData entry;
} {
catch (...) MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
throw; entry.source_replica = dest_table_storage->replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest);
entry_replace.from_database = src_data_id.database_name;
entry_replace.from_table = src_data_id.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = -1;
}
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
zookeeper->multi(ops);
ops.clear();
}
}
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-",
entry.toString(), zkutil::CreateMode::PersistentSequential));
{
Transaction transaction(*dest_table_storage);
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
std::mutex mutex;
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock);
Coordination::Error code = zookeeper->tryMulti(ops, op_results);
if (code == Coordination::Error::ZBADVERSION)
continue;
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock);
transaction.commit(&lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table
Coordination::Requests ops_src;
ops_src.emplace_back(zkutil::makeCreateRequest(
fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
/// Just update version, because merges assignment relies on it
ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops_src);
op_results = zookeeper->multi(ops_src);
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);
return;
} }
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created; throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
parts_to_remove.clear();
cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table
Coordination::Requests ops_src;
ops_src.emplace_back(zkutil::makeCreateRequest(
fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
/// Just update version, because merges assignment relies on it
ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops_src);
op_results = zookeeper->multi(ops_src);
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);
} }
void StorageReplicatedMergeTree::movePartitionToShard( void StorageReplicatedMergeTree::movePartitionToShard(
@ -6984,68 +7001,73 @@ bool StorageReplicatedMergeTree::dropPartImpl(
bool StorageReplicatedMergeTree::dropAllPartsInPartition( bool StorageReplicatedMergeTree::dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach) zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach)
{ {
String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; /// Retry if alter_partition_version changes
Coordination::Stat alter_partition_version_stat; for (size_t retry = 0; retry < 1000; ++retry)
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range_info;
/// It would prevent other replicas from assigning merges which intersect locked block number.
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
{ {
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id); String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
return false; Coordination::Stat alter_partition_version_stat;
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range_info;
/// It would prevent other replicas from assigning merges which intersect locked block number.
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
{
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id);
return false;
}
clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = drop_range_fake_part_name;
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Check and update version to avoid race with REPLACE_RANGE.
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
/// as a result of execution of concurrently created REPLACE_RANGE entry.
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops);
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses;
Coordination::Error code = zookeeper.tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
continue;
else
zkutil::KeeperMultiException::check(code, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block);
return true;
} }
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER,
clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block); "Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed");
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = drop_range_fake_part_name;
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Check and update version to avoid race with REPLACE_RANGE.
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
/// as a result of execution of concurrently created REPLACE_RANGE entry.
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
delimiting_block_lock->getUnlockOps(ops);
if (auto txn = query_context->getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses;
Coordination::Error code = zookeeper.tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
delimiting_block_lock->assumeUnlocked();
else if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER,
"Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed");
else
zkutil::KeeperMultiException::check(code, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block);
return true;
} }