This commit is contained in:
Alexander Tokmakov 2021-05-18 13:00:34 +03:00
parent 20b9af7b26
commit 2dac182df3

View File

@ -581,7 +581,7 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
}
/// For ALTER PARTITION with multi-leaders
zookeeper->createIfNotExists(zookeeper_path + "/drop_range_version", String());
zookeeper->createIfNotExists(zookeeper_path + "/alter_partition_version", String());
}
@ -6217,9 +6217,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
String drop_range_version_path = zookeeper_path + "/drop_range_version";
Coordination::Stat drop_range_version_stat;
zookeeper->get(drop_range_version_path, &drop_range_version_stat);
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
@ -6332,8 +6332,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
delimiting_block_lock->getUnlockOps(ops);
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1));
ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
@ -6436,9 +6436,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
/// Clone parts into destination table.
String drop_range_version_path = dest_table_storage->zookeeper_path + "/drop_range_version";
Coordination::Stat drop_range_version_stat;
zookeeper->get(drop_range_version_path, &drop_range_version_stat);
String alter_partition_version_path = dest_table_storage->zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
for (const auto & src_part : src_all_parts)
{
@ -6520,8 +6520,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
}
/// Check and update version to avoid race with DROP_RANGE
ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1));
ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(dest_table_storage->zookeeper_path + "/log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(dest_table_storage->zookeeper_path + "/log/log-",
@ -6574,16 +6574,27 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
dest_table_storage->waitForAllReplicasToProcessLogEntry(entry);
}
Coordination::Requests ops_src;
/// Create DROP_RANGE for the source table
alter_partition_version_path = zookeeper_path + "/alter_partition_version";
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
Coordination::Requests ops_src;
ops_src.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
/// Check and update version to avoid race with REPLACE_RANGE
ops_src.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
ops_src.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
/// Just update version, because merges assignment relies on it
ops_src.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
delimiting_block_lock->getUnlockOps(ops_src);
op_results = zookeeper->multi(ops_src);
delimiting_block_lock->assumeUnlocked();
Coordination::Error code = zookeeper->tryMulti(ops_src, op_results);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot DROP PARTITION in {} after copying partition to {}, "
"because another ALTER PARTITION query was concurrently executed",
getStorageID().getFullTableName(), dest_table_storage->getStorageID().getFullTableName());
else
zkutil::KeeperMultiException::check(code, ops_src, op_results);
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
@ -6816,9 +6827,9 @@ bool StorageReplicatedMergeTree::dropPart(
bool StorageReplicatedMergeTree::dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach)
{
String drop_range_version_path = zookeeper_path + "/drop_range_version";
Coordination::Stat drop_range_version_stat;
zookeeper.get(drop_range_version_path, &drop_range_version_stat);
String alter_partition_version_path = zookeeper_path + "/alter_partition_version";
Coordination::Stat alter_partition_version_stat;
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
MergeTreePartInfo drop_range_info;
/// It prevent other replicas from assigning merges which intersect locked block number.
@ -6847,8 +6858,8 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
/// Check and update version to avoid race with REPLACE_RANGE.
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
/// as a result of execution of concurrently created REPLACE_RANGE entry.
ops.emplace_back(zkutil::makeCheckRequest(drop_range_version_path, drop_range_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(drop_range_version_path, "", -1));
ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
delimiting_block_lock->getUnlockOps(ops);