diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5efb6dfb879..f9f1b11d0b8 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -581,7 +581,7 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() } /// For ALTER PARTITION with multi-leaders - zookeeper->createIfNotExists(zookeeper_path + "/drop_range_version", String()); + zookeeper->createIfNotExists(zookeeper_path + "/alter_partition_version", String()); } @@ -6217,9 +6217,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( static const String TMP_PREFIX = "tmp_replace_from_"; auto zookeeper = getZooKeeper(); - String drop_range_version_path = zookeeper_path + "/drop_range_version"; - Coordination::Stat drop_range_version_stat; - zookeeper->get(drop_range_version_path, &drop_range_version_stat); + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); /// Firstly, generate last block number and compute drop_range /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. @@ -6332,8 +6332,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom( delimiting_block_lock->getUnlockOps(ops); /// Check and update version to avoid race with DROP_RANGE - ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1)); + ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1)); /// Just update version, because merges assignment relies on it ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); @@ -6436,9 +6436,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta /// Clone parts into destination table. - String drop_range_version_path = dest_table_storage->zookeeper_path + "/drop_range_version"; - Coordination::Stat drop_range_version_stat; - zookeeper->get(drop_range_version_path, &drop_range_version_stat); + String alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); for (const auto & src_part : src_all_parts) { @@ -6520,8 +6520,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta } /// Check and update version to avoid race with DROP_RANGE - ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1)); + ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1)); /// Just update version, because merges assignment relies on it ops.emplace_back(zkutil::makeSetRequest(dest_table_storage->zookeeper_path + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest(dest_table_storage->zookeeper_path + "/log/log-", @@ -6574,16 +6574,27 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta dest_table_storage->waitForAllReplicasToProcessLogEntry(entry); } - Coordination::Requests ops_src; + /// Create DROP_RANGE for the source table + alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); + Coordination::Requests ops_src; ops_src.emplace_back(zkutil::makeCreateRequest( zookeeper_path + "/log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential)); + /// Check and update version to avoid race with REPLACE_RANGE + ops_src.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version)); + ops_src.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1)); /// Just update version, because merges assignment relies on it ops_src.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); delimiting_block_lock->getUnlockOps(ops_src); - op_results = zookeeper->multi(ops_src); - delimiting_block_lock->assumeUnlocked(); + Coordination::Error code = zookeeper->tryMulti(ops_src, op_results); + if (code == Coordination::Error::ZBADVERSION) + throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, " + "because another ALTER PARTITION query was concurrently executed", + getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName()); + else + zkutil::KeeperMultiException::check(code, ops_src, op_results); log_znode_path = dynamic_cast(*op_results.front()).path_created; entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); @@ -6816,9 +6827,9 @@ bool StorageReplicatedMergeTree::dropPart( bool StorageReplicatedMergeTree::dropAllPartsInPartition( zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach) { - String drop_range_version_path = zookeeper_path + "/drop_range_version"; - Coordination::Stat drop_range_version_stat; - zookeeper.get(drop_range_version_path, &drop_range_version_stat); + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper.get(alter_partition_version_path, &alter_partition_version_stat); MergeTreePartInfo drop_range_info; /// It prevent other replicas from assigning merges which intersect locked block number. @@ -6847,8 +6858,8 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition( /// Check and update version to avoid race with REPLACE_RANGE. /// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry /// as a result of execution of concurrently created REPLACE_RANGE entry. - ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1)); + ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1)); /// Just update version, because merges assignment relies on it ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); delimiting_block_lock->getUnlockOps(ops);