mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Improve preprocessing performance
This commit is contained in:
parent
4bbcbda7cc
commit
dae65178b7
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 83c7bb449677ca6cce6cee63b0ab365ed3e923e8
|
||||
Subproject commit 91b53011045d003622285ff319062aec77f6fdb0
|
@ -358,6 +358,12 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
|
||||
acls = operation.acls;
|
||||
last_applied_zxid = delta.zxid;
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, AddAuthDelta>)
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[operation.session_id];
|
||||
uncommitted_auth.auth.emplace_back(operation.auth_id);
|
||||
uncommitted_auth.zxid = delta.zxid;
|
||||
}
|
||||
},
|
||||
delta.operation);
|
||||
}
|
||||
@ -391,7 +397,10 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
// delete all cached nodes that were not modified after the commit_zxid
|
||||
// the commit can end on SubDeltaEnd so we don't want to clear cached nodes too soon
|
||||
if (deltas.empty() || deltas.front().zxid > commit_zxid)
|
||||
{
|
||||
std::erase_if(nodes, [commit_zxid](const auto & node) { return node.second.zxid == commit_zxid; });
|
||||
std::erase_if(session_and_auth, [commit_zxid](const auto & auth) { return auth.second.zxid == commit_zxid; });
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
@ -405,10 +414,12 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
deltas.back().zxid,
|
||||
rollback_zxid);
|
||||
|
||||
auto delta_it = deltas.rbegin();
|
||||
|
||||
// we need to undo ephemeral mapping modifications
|
||||
// CreateNodeDelta added ephemeral for session id -> we need to remove it
|
||||
// RemoveNodeDelta removed ephemeral for session id -> we need to add it back
|
||||
for (auto delta_it = deltas.rbegin(); delta_it != deltas.rend(); ++delta_it)
|
||||
for (; delta_it != deltas.rend(); ++delta_it)
|
||||
{
|
||||
if (delta_it->zxid < rollback_zxid)
|
||||
break;
|
||||
@ -432,18 +443,29 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
},
|
||||
delta_it->operation);
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta_it->operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id].auth;
|
||||
assert(uncommitted_auth.back() == add_auth->auth_id);
|
||||
uncommitted_auth.pop_back();
|
||||
if (uncommitted_auth.empty())
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
}
|
||||
}
|
||||
|
||||
std::erase_if(deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
|
||||
if (delta_it == deltas.rend())
|
||||
deltas.clear();
|
||||
else
|
||||
deltas.erase(delta_it.base(), deltas.end());
|
||||
|
||||
std::unordered_set<std::string> deleted_nodes;
|
||||
absl::flat_hash_set<std::string> deleted_nodes;
|
||||
std::erase_if(
|
||||
nodes,
|
||||
[&, rollback_zxid](const auto & node)
|
||||
{
|
||||
if (node.second.zxid == rollback_zxid)
|
||||
{
|
||||
deleted_nodes.emplace(node.first);
|
||||
deleted_nodes.emplace(std::move(node.first));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -211,6 +211,7 @@ public:
|
||||
|
||||
String path;
|
||||
int64_t zxid;
|
||||
int64_t session_id;
|
||||
Operation operation;
|
||||
};
|
||||
|
||||
@ -229,23 +230,28 @@ public:
|
||||
|
||||
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
|
||||
{
|
||||
for (const auto & session_auth : storage.session_and_auth[session_id])
|
||||
const auto check_auth = [&](const auto & auth_ids)
|
||||
{
|
||||
if (predicate(session_auth))
|
||||
return true;
|
||||
}
|
||||
for (const auto & auth : auth_ids)
|
||||
{
|
||||
if (predicate(auth))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
if (is_local)
|
||||
return check_auth(storage.session_and_auth[session_id]);
|
||||
|
||||
if (check_auth(storage.session_and_auth[session_id]))
|
||||
return true;
|
||||
|
||||
// check if there are uncommitted
|
||||
const auto auth_it = session_and_auth.find(session_id);
|
||||
if (auth_it == session_and_auth.end())
|
||||
return false;
|
||||
|
||||
for (const auto & delta : deltas)
|
||||
{
|
||||
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
|
||||
auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return check_auth(auth_it->second.auth);
|
||||
}
|
||||
|
||||
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
|
||||
@ -257,7 +263,35 @@ public:
|
||||
int64_t zxid{0};
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode> nodes;
|
||||
struct Hash {
|
||||
auto operator()(const std::string_view view) const {
|
||||
SipHash hash;
|
||||
hash.update(view);
|
||||
return hash.get64();
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
struct Equal {
|
||||
auto operator()(const std::string_view a,
|
||||
const std::string_view b) const {
|
||||
return a == b;
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
|
||||
|
||||
struct UncommittedAuth
|
||||
{
|
||||
AuthIDs auth;
|
||||
int64_t zxid{0};
|
||||
};
|
||||
|
||||
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
|
||||
mutable std::unordered_map<int64_t, UncommittedAuth> session_and_auth;
|
||||
|
||||
std::list<Delta> deltas;
|
||||
KeeperStorage & storage;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user