mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #27125 from nicelulu/fix_create_znode
The behavior of clickhouse-keeper to create znode is consistent with zookeeper
This commit is contained in:
commit
09df5018f9
@ -248,117 +248,117 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
|
||||
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
|
||||
Coordination::ZooKeeperCreateRequest & request = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*zk_request);
|
||||
|
||||
if (container.contains(request.path))
|
||||
auto parent_path = parentPath(request.path);
|
||||
auto it = container.find(parent_path);
|
||||
|
||||
if (it == container.end())
|
||||
{
|
||||
response.error = Coordination::Error::ZNONODE;
|
||||
return { response_ptr, undo };
|
||||
}
|
||||
else if (it->value.stat.ephemeralOwner != 0)
|
||||
{
|
||||
response.error = Coordination::Error::ZNOCHILDRENFOREPHEMERALS;
|
||||
return { response_ptr, undo };
|
||||
}
|
||||
std::string path_created = request.path;
|
||||
if (request.is_sequential)
|
||||
{
|
||||
auto seq_num = it->value.seq_num;
|
||||
|
||||
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
seq_num_str.exceptions(std::ios::failbit);
|
||||
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
|
||||
|
||||
path_created += seq_num_str.str();
|
||||
}
|
||||
if (container.contains(path_created))
|
||||
{
|
||||
response.error = Coordination::Error::ZNODEEXISTS;
|
||||
return { response_ptr, undo };
|
||||
}
|
||||
else
|
||||
auto child_path = getBaseName(path_created);
|
||||
if (child_path.empty())
|
||||
{
|
||||
auto parent_path = parentPath(request.path);
|
||||
auto it = container.find(parent_path);
|
||||
|
||||
if (it == container.end())
|
||||
{
|
||||
response.error = Coordination::Error::ZNONODE;
|
||||
}
|
||||
else if (it->value.stat.ephemeralOwner != 0)
|
||||
{
|
||||
response.error = Coordination::Error::ZNOCHILDRENFOREPHEMERALS;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & session_auth_ids = storage.session_and_auth[session_id];
|
||||
|
||||
KeeperStorage::Node created_node;
|
||||
|
||||
Coordination::ACLs node_acls;
|
||||
if (!fixupACL(request.acls, session_auth_ids, node_acls, !request.restored_from_zookeeper_log))
|
||||
{
|
||||
response.error = Coordination::Error::ZINVALIDACL;
|
||||
return {response_ptr, {}};
|
||||
}
|
||||
|
||||
uint64_t acl_id = storage.acl_map.convertACLs(node_acls);
|
||||
storage.acl_map.addUsage(acl_id);
|
||||
|
||||
created_node.acl_id = acl_id;
|
||||
created_node.stat.czxid = zxid;
|
||||
created_node.stat.mzxid = zxid;
|
||||
created_node.stat.pzxid = zxid;
|
||||
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
||||
created_node.stat.mtime = created_node.stat.ctime;
|
||||
created_node.stat.numChildren = 0;
|
||||
created_node.stat.dataLength = request.data.length();
|
||||
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
|
||||
created_node.data = request.data;
|
||||
created_node.is_sequental = request.is_sequential;
|
||||
|
||||
std::string path_created = request.path;
|
||||
|
||||
if (request.is_sequential)
|
||||
{
|
||||
auto seq_num = it->value.seq_num;
|
||||
|
||||
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
seq_num_str.exceptions(std::ios::failbit);
|
||||
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
|
||||
|
||||
path_created += seq_num_str.str();
|
||||
}
|
||||
|
||||
int32_t parent_cversion = request.parent_cversion;
|
||||
auto child_path = getBaseName(path_created);
|
||||
int64_t prev_parent_zxid;
|
||||
int32_t prev_parent_cversion;
|
||||
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid,
|
||||
parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent)
|
||||
{
|
||||
|
||||
parent.children.insert(child_path);
|
||||
prev_parent_cversion = parent.stat.cversion;
|
||||
prev_parent_zxid = parent.stat.pzxid;
|
||||
|
||||
/// Increment sequential number even if node is not sequential
|
||||
++parent.seq_num;
|
||||
|
||||
if (parent_cversion == -1)
|
||||
++parent.stat.cversion;
|
||||
else if (parent_cversion > parent.stat.cversion)
|
||||
parent.stat.cversion = parent_cversion;
|
||||
|
||||
if (zxid > parent.stat.pzxid)
|
||||
parent.stat.pzxid = zxid;
|
||||
++parent.stat.numChildren;
|
||||
});
|
||||
|
||||
response.path_created = path_created;
|
||||
container.insert(path_created, std::move(created_node));
|
||||
|
||||
if (request.is_ephemeral)
|
||||
ephemerals[session_id].emplace(path_created);
|
||||
|
||||
undo = [&storage, prev_parent_zxid, prev_parent_cversion, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
|
||||
{
|
||||
storage.container.erase(path_created);
|
||||
storage.acl_map.removeUsage(acl_id);
|
||||
|
||||
if (is_ephemeral)
|
||||
storage.ephemerals[session_id].erase(path_created);
|
||||
|
||||
storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorage::Node & undo_parent)
|
||||
{
|
||||
--undo_parent.stat.numChildren;
|
||||
--undo_parent.seq_num;
|
||||
undo_parent.stat.cversion = prev_parent_cversion;
|
||||
undo_parent.stat.pzxid = prev_parent_zxid;
|
||||
undo_parent.children.erase(child_path);
|
||||
});
|
||||
};
|
||||
|
||||
response.error = Coordination::Error::ZOK;
|
||||
}
|
||||
response.error = Coordination::Error::ZBADARGUMENTS;
|
||||
return { response_ptr, undo };
|
||||
}
|
||||
|
||||
auto & session_auth_ids = storage.session_and_auth[session_id];
|
||||
|
||||
KeeperStorage::Node created_node;
|
||||
|
||||
Coordination::ACLs node_acls;
|
||||
if (!fixupACL(request.acls, session_auth_ids, node_acls, !request.restored_from_zookeeper_log))
|
||||
{
|
||||
response.error = Coordination::Error::ZINVALIDACL;
|
||||
return {response_ptr, {}};
|
||||
}
|
||||
|
||||
uint64_t acl_id = storage.acl_map.convertACLs(node_acls);
|
||||
storage.acl_map.addUsage(acl_id);
|
||||
|
||||
created_node.acl_id = acl_id;
|
||||
created_node.stat.czxid = zxid;
|
||||
created_node.stat.mzxid = zxid;
|
||||
created_node.stat.pzxid = zxid;
|
||||
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
||||
created_node.stat.mtime = created_node.stat.ctime;
|
||||
created_node.stat.numChildren = 0;
|
||||
created_node.stat.dataLength = request.data.length();
|
||||
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
|
||||
created_node.data = request.data;
|
||||
created_node.is_sequental = request.is_sequential;
|
||||
|
||||
int32_t parent_cversion = request.parent_cversion;
|
||||
int64_t prev_parent_zxid;
|
||||
int32_t prev_parent_cversion;
|
||||
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid,
|
||||
parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent)
|
||||
{
|
||||
|
||||
parent.children.insert(child_path);
|
||||
prev_parent_cversion = parent.stat.cversion;
|
||||
prev_parent_zxid = parent.stat.pzxid;
|
||||
|
||||
/// Increment sequential number even if node is not sequential
|
||||
++parent.seq_num;
|
||||
|
||||
if (parent_cversion == -1)
|
||||
++parent.stat.cversion;
|
||||
else if (parent_cversion > parent.stat.cversion)
|
||||
parent.stat.cversion = parent_cversion;
|
||||
|
||||
if (zxid > parent.stat.pzxid)
|
||||
parent.stat.pzxid = zxid;
|
||||
++parent.stat.numChildren;
|
||||
});
|
||||
|
||||
response.path_created = path_created;
|
||||
container.insert(path_created, std::move(created_node));
|
||||
|
||||
if (request.is_ephemeral)
|
||||
ephemerals[session_id].emplace(path_created);
|
||||
|
||||
undo = [&storage, prev_parent_zxid, prev_parent_cversion, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
|
||||
{
|
||||
storage.container.erase(path_created);
|
||||
storage.acl_map.removeUsage(acl_id);
|
||||
|
||||
if (is_ephemeral)
|
||||
storage.ephemerals[session_id].erase(path_created);
|
||||
|
||||
storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorage::Node & undo_parent)
|
||||
{
|
||||
--undo_parent.stat.numChildren;
|
||||
--undo_parent.seq_num;
|
||||
undo_parent.stat.cversion = prev_parent_cversion;
|
||||
undo_parent.stat.pzxid = prev_parent_zxid;
|
||||
undo_parent.children.erase(child_path);
|
||||
});
|
||||
};
|
||||
|
||||
response.error = Coordination::Error::ZOK;
|
||||
return { response_ptr, undo };
|
||||
}
|
||||
};
|
||||
|
@ -90,6 +90,46 @@ def test_sequential_nodes(started_cluster):
|
||||
genuine_childs = list(sorted(genuine_zk.get_children("/test_sequential_nodes")))
|
||||
fake_childs = list(sorted(fake_zk.get_children("/test_sequential_nodes")))
|
||||
assert genuine_childs == fake_childs
|
||||
|
||||
genuine_zk.create("/test_sequential_nodes_1")
|
||||
fake_zk.create("/test_sequential_nodes_1")
|
||||
|
||||
genuine_zk.create("/test_sequential_nodes_1/a", sequence=True)
|
||||
fake_zk.create("/test_sequential_nodes_1/a", sequence=True)
|
||||
|
||||
genuine_zk.create("/test_sequential_nodes_1/a0000000002")
|
||||
fake_zk.create("/test_sequential_nodes_1/a0000000002")
|
||||
|
||||
genuine_throw = False
|
||||
fake_throw = False
|
||||
try:
|
||||
genuine_zk.create("/test_sequential_nodes_1/a", sequence=True)
|
||||
except Exception as ex:
|
||||
genuine_throw = True
|
||||
|
||||
try:
|
||||
fake_zk.create("/test_sequential_nodes_1/a", sequence=True)
|
||||
except Exception as ex:
|
||||
fake_throw = True
|
||||
|
||||
assert genuine_throw == True
|
||||
assert fake_throw == True
|
||||
|
||||
genuine_childs_1 = list(sorted(genuine_zk.get_children("/test_sequential_nodes_1")))
|
||||
fake_childs_1 = list(sorted(fake_zk.get_children("/test_sequential_nodes_1")))
|
||||
assert genuine_childs_1 == fake_childs_1
|
||||
|
||||
genuine_zk.create("/test_sequential_nodes_2")
|
||||
fake_zk.create("/test_sequential_nodes_2")
|
||||
|
||||
genuine_zk.create("/test_sequential_nodes_2/node")
|
||||
fake_zk.create("/test_sequential_nodes_2/node")
|
||||
genuine_zk.create("/test_sequential_nodes_2/node", sequence=True)
|
||||
fake_zk.create("/test_sequential_nodes_2/node", sequence=True)
|
||||
|
||||
genuine_childs_2 = list(sorted(genuine_zk.get_children("/test_sequential_nodes_2")))
|
||||
fake_childs_2 = list(sorted(fake_zk.get_children("/test_sequential_nodes_2")))
|
||||
assert genuine_childs_2 == fake_childs_2
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
Loading…
Reference in New Issue
Block a user