diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 63bb8af9148..d3219c52ef3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6257,186 +6257,195 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); - DataPartsVector src_parts; - MutableDataPartsVector dst_parts; - Strings block_id_paths; - Strings part_checksums; - auto zookeeper = getZooKeeper(); - std::vector ephemeral_locks; LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); static const String TMP_PREFIX = "tmp_replace_from_"; + auto zookeeper = getZooKeeper(); - String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; - Coordination::Stat alter_partition_version_stat; - zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - - /// Firstly, generate last block number and compute drop_range - /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. - /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. - /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true? - MergeTreePartInfo drop_range; - std::optional delimiting_block_lock; - bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); - if (replace && partition_was_empty) + /// Retry if alter_partition_version changes + for (size_t retry = 0; retry < 1000; ++retry) { - /// Nothing to drop, will just attach new parts - LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id); - replace = false; - } + DataPartsVector src_parts; + MutableDataPartsVector dst_parts; + Strings block_id_paths; + Strings part_checksums; + std::vector ephemeral_locks; + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - if (!replace) - { - /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range - drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); - } - - assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); - - String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); - - for (const auto & src_part : src_all_parts) - { - /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION - /// Assume that merges in the partition are quite rare - /// Save deduplication block ids with special prefix replace_partition - - if (!canReplacePartition(src_part)) - throw Exception( - "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", - ErrorCodes::LOGICAL_ERROR); - - String hash_hex = src_part->checksums.getTotalChecksumHex(); - - if (replace) - LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex); - else - LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex); - - String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex)); - - auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path); - if (!lock) + /// Firstly, generate last block number and compute drop_range + /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. + /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. + /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true? + MergeTreePartInfo drop_range; + std::optional delimiting_block_lock; + bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); + if (replace && partition_was_empty) { - LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); - continue; + /// Nothing to drop, will just attach new parts + LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id); + replace = false; } - UInt64 index = lock->getNumber(); - MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); - auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot); - - src_parts.emplace_back(src_part); - dst_parts.emplace_back(dst_part); - ephemeral_locks.emplace_back(std::move(*lock)); - block_id_paths.emplace_back(block_id_path); - part_checksums.emplace_back(hash_hex); - } - - ReplicatedMergeTreeLogEntryData entry; - { - auto src_table_id = src_data.getStorageID(); - entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = replica_name; - entry.create_time = time(nullptr); - entry.replace_range_entry = std::make_shared(); - - auto & entry_replace = *entry.replace_range_entry; - entry_replace.drop_range_part_name = drop_range_fake_part_name; - entry_replace.from_database = src_table_id.database_name; - entry_replace.from_table = src_table_id.table_name; - for (const auto & part : src_parts) - entry_replace.src_part_names.emplace_back(part->name); - for (const auto & part : dst_parts) - entry_replace.new_part_names.emplace_back(part->name); - for (const String & checksum : part_checksums) - entry_replace.part_names_checksums.emplace_back(checksum); - entry_replace.columns_version = -1; - } - - /// Remove deduplication block_ids of replacing parts - if (replace) - clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); - - DataPartsVector parts_to_remove; - Coordination::Responses op_results; - - try - { - Coordination::Requests ops; - for (size_t i = 0; i < dst_parts.size(); ++i) + if (!replace) { - getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); - ephemeral_locks[i].getUnlockOps(ops); - - if (ops.size() > zkutil::MULTI_BATCH_SIZE) - { - /// It is unnecessary to add parts to working set until we commit log entry - zookeeper->multi(ops); - ops.clear(); - } + /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range + drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); } - if (auto txn = query_context->getZooKeeperMetadataTransaction()) - txn->moveOpsTo(ops); + assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); - delimiting_block_lock->getUnlockOps(ops); - /// Check and update version to avoid race with DROP_RANGE - ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); - /// Just update version, because merges assignment relies on it - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); - Transaction transaction(*this); + for (const auto & src_part : src_all_parts) { - auto data_parts_lock = lockParts(); + /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION + /// Assume that merges in the partition are quite rare + /// Save deduplication block ids with special prefix replace_partition - for (MutableDataPartPtr & part : dst_parts) - renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock); - } + if (!canReplacePartition(src_part)) + throw Exception( + "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", + ErrorCodes::LOGICAL_ERROR); - Coordination::Error code = zookeeper->tryMulti(ops, op_results); - if (code == Coordination::Error::ZOK) - delimiting_block_lock->assumeUnlocked(); - else if (code == Coordination::Error::ZBADVERSION) - throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed"); - else - zkutil::KeeperMultiException::check(code, ops, op_results); + String hash_hex = src_part->checksums.getTotalChecksumHex(); - { - auto data_parts_lock = lockParts(); - - transaction.commit(&data_parts_lock); if (replace) - parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock); + LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex); + else + LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex); + + String block_id_path = replace ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex)); + + auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path); + if (!lock) + { + LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); + continue; + } + + UInt64 index = lock->getNumber(); + MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); + auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot); + + src_parts.emplace_back(src_part); + dst_parts.emplace_back(dst_part); + ephemeral_locks.emplace_back(std::move(*lock)); + block_id_paths.emplace_back(block_id_path); + part_checksums.emplace_back(hash_hex); } - PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); - } - catch (...) - { - PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); - throw; + ReplicatedMergeTreeLogEntryData entry; + { + auto src_table_id = src_data.getStorageID(); + entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry.source_replica = replica_name; + entry.create_time = time(nullptr); + entry.replace_range_entry = std::make_shared(); + + auto & entry_replace = *entry.replace_range_entry; + entry_replace.drop_range_part_name = drop_range_fake_part_name; + entry_replace.from_database = src_table_id.database_name; + entry_replace.from_table = src_table_id.table_name; + for (const auto & part : src_parts) + entry_replace.src_part_names.emplace_back(part->name); + for (const auto & part : dst_parts) + entry_replace.new_part_names.emplace_back(part->name); + for (const String & checksum : part_checksums) + entry_replace.part_names_checksums.emplace_back(checksum); + entry_replace.columns_version = -1; + } + + /// Remove deduplication block_ids of replacing parts + if (replace) + clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); + + DataPartsVector parts_to_remove; + Coordination::Responses op_results; + + try + { + Coordination::Requests ops; + for (size_t i = 0; i < dst_parts.size(); ++i) + { + getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); + ephemeral_locks[i].getUnlockOps(ops); + + if (ops.size() > zkutil::MULTI_BATCH_SIZE) + { + /// It is unnecessary to add parts to working set until we commit log entry + zookeeper->multi(ops); + ops.clear(); + } + } + + if (auto txn = query_context->getZooKeeperMetadataTransaction()) + txn->moveOpsTo(ops); + + delimiting_block_lock->getUnlockOps(ops); + /// Check and update version to avoid race with DROP_RANGE + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); + /// Just update version, because merges assignment relies on it + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Transaction transaction(*this); + { + auto data_parts_lock = lockParts(); + + for (MutableDataPartPtr & part : dst_parts) + renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock); + } + + Coordination::Error code = zookeeper->tryMulti(ops, op_results); + if (code == Coordination::Error::ZOK) + delimiting_block_lock->assumeUnlocked(); + else if (code == Coordination::Error::ZBADVERSION) + continue; + else + zkutil::KeeperMultiException::check(code, ops, op_results); + + { + auto data_parts_lock = lockParts(); + + transaction.commit(&data_parts_lock); + if (replace) + parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, data_parts_lock); + } + + PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); + } + catch (...) + { + PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); + throw; + } + + String log_znode_path = dynamic_cast(*op_results.back()).path_created; + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + for (auto & lock : ephemeral_locks) + lock.assumeUnlocked(); + + /// Forcibly remove replaced parts from ZooKeeper + tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); + + /// Speedup removing of replaced parts from filesystem + parts_to_remove.clear(); + cleanup_thread.wakeup(); + + lock2.reset(); + lock1.reset(); + + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); + + return; } - String log_znode_path = dynamic_cast(*op_results.back()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - for (auto & lock : ephemeral_locks) - lock.assumeUnlocked(); - - /// Forcibly remove replaced parts from ZooKeeper - tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); - - /// Speedup removing of replaced parts from filesystem - parts_to_remove.clear(); - cleanup_thread.wakeup(); - - lock2.reset(); - lock1.reset(); - - waitForLogEntryToBeProcessedIfNecessary(entry, query_context); + throw Exception( + ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed"); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) @@ -6464,199 +6473,207 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta /// A range for log entry to remove parts from the source table (myself). auto zookeeper = getZooKeeper(); - String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; - Coordination::Stat alter_partition_version_stat; - zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - - MergeTreePartInfo drop_range; - std::optional delimiting_block_lock; - getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); - String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); - - DataPartPtr covering_part; - DataPartsVector src_all_parts; + /// Retry if alter_partition_version changes + for (size_t retry = 0; retry < 1000; ++retry) { - /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. - auto parts_lock = src_data.lockParts(); - src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock); - } + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - if (covering_part) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug", - covering_part->name, drop_range_fake_part_name); + MergeTreePartInfo drop_range; + std::optional delimiting_block_lock; + getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); + String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); - /// After allocating block number for drop_range we must ensure that it does not intersect block numbers - /// allocated by concurrent REPLACE query. - /// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log, - /// but it's better to check it here and fail as early as possible (before we have done something to destination table). - Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version); - if (version_check_code != Coordination::Error::ZOK) - throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, " - "because another ALTER PARTITION query was concurrently executed", - getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName()); - - DataPartsVector src_parts; - MutableDataPartsVector dst_parts; - Strings block_id_paths; - Strings part_checksums; - std::vector ephemeral_locks; - - LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - - static const String TMP_PREFIX = "tmp_move_from_"; - - /// Clone parts into destination table. - String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version"; - Coordination::Stat dest_alter_partition_version_stat; - zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat); - for (const auto & src_part : src_all_parts) - { - if (!dest_table_storage->canReplacePartition(src_part)) - throw Exception( - "Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", - ErrorCodes::LOGICAL_ERROR); - - String hash_hex = src_part->checksums.getTotalChecksumHex(); - String block_id_path; - - auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path); - if (!lock) + DataPartPtr covering_part; + DataPartsVector src_all_parts; { - LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); - continue; + /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. + auto parts_lock = src_data.lockParts(); + src_all_parts = src_data.getActivePartsToReplace(drop_range, drop_range_fake_part_name, covering_part, parts_lock); } - UInt64 index = lock->getNumber(); - MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); - auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot); + if (covering_part) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got part {} covering drop range {}, it's a bug", + covering_part->name, drop_range_fake_part_name); - src_parts.emplace_back(src_part); - dst_parts.emplace_back(dst_part); - ephemeral_locks.emplace_back(std::move(*lock)); - block_id_paths.emplace_back(block_id_path); - part_checksums.emplace_back(hash_hex); - } + /// After allocating block number for drop_range we must ensure that it does not intersect block numbers + /// allocated by concurrent REPLACE query. + /// We could check it in multi-request atomically with creation of DROP_RANGE entry in source table log, + /// but it's better to check it here and fail as early as possible (before we have done something to destination table). + Coordination::Error version_check_code = zookeeper->trySet(alter_partition_version_path, "", alter_partition_version_stat.version); + if (version_check_code != Coordination::Error::ZOK) + throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, " + "because another ALTER PARTITION query was concurrently executed", + getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName()); - ReplicatedMergeTreeLogEntryData entry_delete; - { - entry_delete.type = LogEntry::DROP_RANGE; - entry_delete.source_replica = replica_name; - entry_delete.new_part_name = drop_range_fake_part_name; - entry_delete.detach = false; //-V1048 - entry_delete.create_time = time(nullptr); - } + DataPartsVector src_parts; + MutableDataPartsVector dst_parts; + Strings block_id_paths; + Strings part_checksums; + std::vector ephemeral_locks; - ReplicatedMergeTreeLogEntryData entry; - { - MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); + LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = dest_table_storage->replica_name; - entry.create_time = time(nullptr); - entry.replace_range_entry = std::make_shared(); + static const String TMP_PREFIX = "tmp_move_from_"; - auto & entry_replace = *entry.replace_range_entry; - entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest); - entry_replace.from_database = src_data_id.database_name; - entry_replace.from_table = src_data_id.table_name; - for (const auto & part : src_parts) - entry_replace.src_part_names.emplace_back(part->name); - for (const auto & part : dst_parts) - entry_replace.new_part_names.emplace_back(part->name); - for (const String & checksum : part_checksums) - entry_replace.part_names_checksums.emplace_back(checksum); - entry_replace.columns_version = -1; - } - - clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); - - DataPartsVector parts_to_remove; - Coordination::Responses op_results; - - try - { - Coordination::Requests ops; - for (size_t i = 0; i < dst_parts.size(); ++i) + /// Clone parts into destination table. + String dest_alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version"; + Coordination::Stat dest_alter_partition_version_stat; + zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat); + for (const auto & src_part : src_all_parts) { - dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); - ephemeral_locks[i].getUnlockOps(ops); + if (!dest_table_storage->canReplacePartition(src_part)) + throw Exception( + "Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", + ErrorCodes::LOGICAL_ERROR); - if (ops.size() > zkutil::MULTI_BATCH_SIZE) + String hash_hex = src_part->checksums.getTotalChecksumHex(); + String block_id_path; + + auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path); + if (!lock) { - zookeeper->multi(ops); - ops.clear(); + LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); + continue; } + + UInt64 index = lock->getNumber(); + MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); + auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot); + + src_parts.emplace_back(src_part); + dst_parts.emplace_back(dst_part); + ephemeral_locks.emplace_back(std::move(*lock)); + block_id_paths.emplace_back(block_id_path); + part_checksums.emplace_back(hash_hex); } - /// Check and update version to avoid race with DROP_RANGE - ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version)); - /// Just update version, because merges assignment relies on it - ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-", - entry.toString(), zkutil::CreateMode::PersistentSequential)); - + ReplicatedMergeTreeLogEntryData entry_delete; { - Transaction transaction(*dest_table_storage); - - auto src_data_parts_lock = lockParts(); - auto dest_data_parts_lock = dest_table_storage->lockParts(); - - std::mutex mutex; - DataPartsLock lock(mutex); - - for (MutableDataPartPtr & part : dst_parts) - dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock); - - Coordination::Error code = zookeeper->tryMulti(ops, op_results); - if (code == Coordination::Error::ZBADVERSION) - throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed"); - else - zkutil::KeeperMultiException::check(code, ops, op_results); - - parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock); - transaction.commit(&lock); + entry_delete.type = LogEntry::DROP_RANGE; + entry_delete.source_replica = replica_name; + entry_delete.new_part_name = drop_range_fake_part_name; + entry_delete.detach = false; //-V1048 + entry_delete.create_time = time(nullptr); } - PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); - } - catch (...) - { - PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); - throw; + ReplicatedMergeTreeLogEntryData entry; + { + MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); + + entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry.source_replica = dest_table_storage->replica_name; + entry.create_time = time(nullptr); + entry.replace_range_entry = std::make_shared(); + + auto & entry_replace = *entry.replace_range_entry; + entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest); + entry_replace.from_database = src_data_id.database_name; + entry_replace.from_table = src_data_id.table_name; + for (const auto & part : src_parts) + entry_replace.src_part_names.emplace_back(part->name); + for (const auto & part : dst_parts) + entry_replace.new_part_names.emplace_back(part->name); + for (const String & checksum : part_checksums) + entry_replace.part_names_checksums.emplace_back(checksum); + entry_replace.columns_version = -1; + } + + clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); + + DataPartsVector parts_to_remove; + Coordination::Responses op_results; + + try + { + Coordination::Requests ops; + for (size_t i = 0; i < dst_parts.size(); ++i) + { + dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); + ephemeral_locks[i].getUnlockOps(ops); + + if (ops.size() > zkutil::MULTI_BATCH_SIZE) + { + zookeeper->multi(ops); + ops.clear(); + } + } + + /// Check and update version to avoid race with DROP_RANGE + ops.emplace_back(zkutil::makeSetRequest(dest_alter_partition_version_path, "", dest_alter_partition_version_stat.version)); + /// Just update version, because merges assignment relies on it + ops.emplace_back(zkutil::makeSetRequest(fs::path(dest_table_storage->zookeeper_path) / "log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(dest_table_storage->zookeeper_path) / "log/log-", + entry.toString(), zkutil::CreateMode::PersistentSequential)); + + { + Transaction transaction(*dest_table_storage); + + auto src_data_parts_lock = lockParts(); + auto dest_data_parts_lock = dest_table_storage->lockParts(); + + std::mutex mutex; + DataPartsLock lock(mutex); + + for (MutableDataPartPtr & part : dst_parts) + dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, lock); + + Coordination::Error code = zookeeper->tryMulti(ops, op_results); + if (code == Coordination::Error::ZBADVERSION) + continue; + else + zkutil::KeeperMultiException::check(code, ops, op_results); + + parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, lock); + transaction.commit(&lock); + } + + PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); + } + catch (...) + { + PartLog::addNewParts(getContext(), dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); + throw; + } + + String log_znode_path = dynamic_cast(*op_results.back()).path_created; + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + for (auto & lock : ephemeral_locks) + lock.assumeUnlocked(); + + tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); + + parts_to_remove.clear(); + cleanup_thread.wakeup(); + lock2.reset(); + + dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context); + + /// Create DROP_RANGE for the source table + Coordination::Requests ops_src; + ops_src.emplace_back(zkutil::makeCreateRequest( + fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential)); + /// Just update version, because merges assignment relies on it + ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); + delimiting_block_lock->getUnlockOps(ops_src); + + op_results = zookeeper->multi(ops_src); + + log_znode_path = dynamic_cast(*op_results.front()).path_created; + entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + lock1.reset(); + waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); + + /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. + cleanLastPartNode(partition_id); + + return; } - String log_znode_path = dynamic_cast(*op_results.back()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - for (auto & lock : ephemeral_locks) - lock.assumeUnlocked(); - - tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); - - parts_to_remove.clear(); - cleanup_thread.wakeup(); - lock2.reset(); - - dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context); - - /// Create DROP_RANGE for the source table - Coordination::Requests ops_src; - ops_src.emplace_back(zkutil::makeCreateRequest( - fs::path(zookeeper_path) / "log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential)); - /// Just update version, because merges assignment relies on it - ops_src.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - delimiting_block_lock->getUnlockOps(ops_src); - - op_results = zookeeper->multi(ops_src); - - log_znode_path = dynamic_cast(*op_results.front()).path_created; - entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - lock1.reset(); - waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); - - /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. - cleanLastPartNode(partition_id); + throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed"); } void StorageReplicatedMergeTree::movePartitionToShard( @@ -6984,68 +7001,73 @@ bool StorageReplicatedMergeTree::dropPartImpl( bool StorageReplicatedMergeTree::dropAllPartsInPartition( zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach) { - String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; - Coordination::Stat alter_partition_version_stat; - zookeeper.get(alter_partition_version_path, &alter_partition_version_stat); - - MergeTreePartInfo drop_range_info; - - /// It would prevent other replicas from assigning merges which intersect locked block number. - std::optional delimiting_block_lock; - - if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock)) + /// Retry if alter_partition_version changes + for (size_t retry = 0; retry < 1000; ++retry) { - LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id); - return false; + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper.get(alter_partition_version_path, &alter_partition_version_stat); + + MergeTreePartInfo drop_range_info; + + /// It would prevent other replicas from assigning merges which intersect locked block number. + std::optional delimiting_block_lock; + + if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock)) + { + LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id); + return false; + } + + clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block); + + String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info); + + LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name); + + /// Finally, having achieved the necessary invariants, you can put an entry in the log. + entry.type = LogEntry::DROP_RANGE; + entry.source_replica = replica_name; + entry.new_part_name = drop_range_fake_part_name; + entry.detach = detach; + entry.create_time = time(nullptr); + + Coordination::Requests ops; + + ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), + zkutil::CreateMode::PersistentSequential)); + + /// Check and update version to avoid race with REPLACE_RANGE. + /// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry + /// as a result of execution of concurrently created REPLACE_RANGE entry. + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); + + /// Just update version, because merges assignment relies on it + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); + delimiting_block_lock->getUnlockOps(ops); + + if (auto txn = query_context->getZooKeeperMetadataTransaction()) + txn->moveOpsTo(ops); + + Coordination::Responses responses; + Coordination::Error code = zookeeper.tryMulti(ops, responses); + + if (code == Coordination::Error::ZOK) + delimiting_block_lock->assumeUnlocked(); + else if (code == Coordination::Error::ZBADVERSION) + continue; + else + zkutil::KeeperMultiException::check(code, ops, responses); + + String log_znode_path = dynamic_cast(*responses.front()).path_created; + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block); + + return true; } - - clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block); - - String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info); - - LOG_DEBUG(log, "Disabled merges covered by range {}", drop_range_fake_part_name); - - /// Finally, having achieved the necessary invariants, you can put an entry in the log. - entry.type = LogEntry::DROP_RANGE; - entry.source_replica = replica_name; - entry.new_part_name = drop_range_fake_part_name; - entry.detach = detach; - entry.create_time = time(nullptr); - - Coordination::Requests ops; - - ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), - zkutil::CreateMode::PersistentSequential)); - - /// Check and update version to avoid race with REPLACE_RANGE. - /// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry - /// as a result of execution of concurrently created REPLACE_RANGE entry. - ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); - - /// Just update version, because merges assignment relies on it - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - delimiting_block_lock->getUnlockOps(ops); - - if (auto txn = query_context->getZooKeeperMetadataTransaction()) - txn->moveOpsTo(ops); - - Coordination::Responses responses; - Coordination::Error code = zookeeper.tryMulti(ops, responses); - - if (code == Coordination::Error::ZOK) - delimiting_block_lock->assumeUnlocked(); - else if (code == Coordination::Error::ZBADVERSION) - throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, - "Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed"); - else - zkutil::KeeperMultiException::check(code, ops, responses); - - String log_znode_path = dynamic_cast(*responses.front()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block); - - return true; + throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, + "Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed"); }