diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index b7528cf5ea7..8cbeb26f2c7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4974,9 +4974,175 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ waitForAllReplicasToProcessLogEntry(entry); } -void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & /*source_table*/, const ASTPtr & /*partition*/, const Context & /*context*/) +void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) { - // TODO: Implement + auto dest_table_storage = std::dynamic_pointer_cast(dest_table); + if (!dest_table_storage) + throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for MergeTree family of table engines." + " Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED); + + auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); + auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId()); + + Stopwatch watch; + MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(this); + String partition_id = getPartitionIDFromQuery(partition, context); + + DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + DataPartsVector src_parts; + MutableDataPartsVector dst_parts; + Strings block_id_paths; + Strings part_checksums; + std::vector ephemeral_locks; + + LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts"); + + static const String TMP_PREFIX = "tmp_replace_from_"; + auto zookeeper = getZooKeeper(); + + /// Firstly, generate last block number and compute drop_range + /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. + /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. + MergeTreePartInfo drop_range; + drop_range.partition_id = partition_id; + drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber(); + drop_range.min_block = 0; + drop_range.level = std::numeric_limits::max(); + + String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); + + if (drop_range.getBlocksCount() > 1) + { + /// We have to prohibit merges in drop_range, since new merge log entry appeared after this REPLACE FROM entry + /// could produce new merged part instead in place of just deleted parts. + /// It is better to prohibit them on leader replica (like DROP PARTITION makes), + /// but it is inconvenient for a user since he could actually use source table from this replica. + /// Therefore prohibit merges on the initializer server now and on the remaining servers when log entry will be executed. + /// It does not provides strong guarantees, but is suitable for intended use case (assume merges are quite rare). + + { + std::lock_guard merge_selecting_lock(merge_selecting_mutex); + queue.disableMergesInRange(drop_range_fake_part_name); + } + } + + for (size_t i = 0; i < src_all_parts.size(); ++i) + { + /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION + /// Assume that merges in the partition are quite rare + /// Save deduplication block ids with special prefix replace_partition + + auto & src_part = src_all_parts[i]; + + if (!dest_table_storage->canReplacePartition(src_part)) + throw Exception( + "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", + ErrorCodes::LOGICAL_ERROR); + + String hash_hex = src_part->checksums.getTotalChecksumHex(); + String block_id_path = "" ; + + auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path); + if (!lock) + { + LOG_INFO(log, "Part " << src_part->name << " (hash " << hash_hex << ") has been already attached"); + continue; + } + + UInt64 index = lock->getNumber(); + MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); + auto dst_part = dest_table_storage->cloneAndLoadDataPart(src_part, TMP_PREFIX, dst_part_info); + + src_parts.emplace_back(src_part); + dst_parts.emplace_back(dst_part); + ephemeral_locks.emplace_back(std::move(*lock)); + block_id_paths.emplace_back(block_id_path); + part_checksums.emplace_back(hash_hex); + } + + ReplicatedMergeTreeLogEntryData entry; + { + entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry.source_replica = replica_name; + entry.create_time = time(nullptr); + entry.replace_range_entry = std::make_shared(); + + auto & entry_replace = *entry.replace_range_entry; + entry_replace.drop_range_part_name = drop_range_fake_part_name; + entry_replace.from_database = src_data.database_name; + entry_replace.from_table = src_data.table_name; + for (const auto & part : src_parts) + entry_replace.src_part_names.emplace_back(part->name); + for (const auto & part : dst_parts) + entry_replace.new_part_names.emplace_back(part->name); + for (const String & checksum : part_checksums) + entry_replace.part_names_checksums.emplace_back(checksum); + entry_replace.columns_version = columns_version; + } + + /// We are almost ready to commit changes, remove fetches and merges from drop range + queue.removePartProducingOpsInRange(zookeeper, drop_range, entry); + + /// Remove deduplication block_ids of replacing parts + dest_table_storage->clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); + + DataPartsVector parts_to_remove; + Coordination::Responses op_results; + + try + { + Coordination::Requests ops; + for (size_t i = 0; i < dst_parts.size(); ++i) + { + dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); + ephemeral_locks[i].getUnlockOps(ops); + + if (ops.size() > zkutil::MULTI_BATCH_SIZE) + { + /// It is unnecessary to add parts to working set until we commit log entry + zookeeper->multi(ops); + ops.clear(); + } + } + + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + + { + Transaction transaction(*dest_table_storage); + auto data_parts_lock = lockParts(); + + for (MutableDataPartPtr & part : dst_parts) + dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock); + + transaction.commit(&data_parts_lock); + parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock); + } + + op_results = zookeeper->multi(ops); + PartLog::addNewParts(global_context, dst_parts, watch.elapsed()); + } + catch (...) + { + PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); + throw; + } + + String log_znode_path = dynamic_cast(*op_results.back()).path_created; + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + for (auto & lock : ephemeral_locks) + lock.assumeUnlocked(); + + /// Forcibly remove replaced parts from ZooKeeper + tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); + + /// Speedup removing of replaced parts from filesystem + parts_to_remove.clear(); + cleanup_thread.wakeup(); + + /// If necessary, wait until the operation is performed on all replicas. + if (context.getSettingsRef().replication_alter_partitions_sync > 1) + waitForAllReplicasToProcessLogEntry(entry); } void StorageReplicatedMergeTree::getCommitPartOps(