mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #40627 from ClickHouse/fix-install-snapshot
Keeper fix for install snapshot and preprocessing performance improvements
This commit is contained in:
commit
cecdcb5059
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 33f60f961d4914441b684af43e9e5535078ba54b
|
||||
Subproject commit bdba298189e29995892de78dcecf64d127444e81
|
@ -370,6 +370,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
|
||||
{
|
||||
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, next_log_idx);
|
||||
|
||||
size_t preprocessed = 0;
|
||||
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
|
||||
auto idx = state_machine->last_commit_index() + 1;
|
||||
for (const auto & entry : *log_entries)
|
||||
@ -378,7 +379,12 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
|
||||
state_machine->pre_commit(idx, entry->get_buf());
|
||||
|
||||
++idx;
|
||||
++preprocessed;
|
||||
|
||||
if (preprocessed % 50000 == 0)
|
||||
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
|
||||
}
|
||||
LOG_INFO(log, "Preprocessing done");
|
||||
}
|
||||
|
||||
loadLatestConfig();
|
||||
|
@ -369,7 +369,15 @@ void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
|
||||
const auto & added_delta = deltas.emplace_back(std::move(delta));
|
||||
|
||||
if (!added_delta.path.empty())
|
||||
{
|
||||
deltas_for_path[added_delta.path].push_back(&added_delta);
|
||||
applyDelta(added_delta);
|
||||
}
|
||||
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
|
||||
uncommitted_auth.emplace_back(&auth_delta->auth_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -385,6 +393,26 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
break;
|
||||
}
|
||||
|
||||
auto & front_delta = deltas.front();
|
||||
|
||||
if (!front_delta.path.empty())
|
||||
{
|
||||
auto & path_deltas = deltas_for_path.at(front_delta.path);
|
||||
assert(path_deltas.front() == &front_delta);
|
||||
path_deltas.pop_front();
|
||||
if (path_deltas.empty())
|
||||
deltas_for_path.erase(front_delta.path);
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&front_delta.operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
|
||||
assert(!uncommitted_auth.empty() && uncommitted_auth.front() == &add_auth->auth_id);
|
||||
uncommitted_auth.pop_front();
|
||||
if (uncommitted_auth.empty())
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
|
||||
}
|
||||
|
||||
deltas.pop_front();
|
||||
}
|
||||
|
||||
@ -405,10 +433,12 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
deltas.back().zxid,
|
||||
rollback_zxid);
|
||||
|
||||
auto delta_it = deltas.rbegin();
|
||||
|
||||
// we need to undo ephemeral mapping modifications
|
||||
// CreateNodeDelta added ephemeral for session id -> we need to remove it
|
||||
// RemoveNodeDelta removed ephemeral for session id -> we need to add it back
|
||||
for (auto delta_it = deltas.rbegin(); delta_it != deltas.rend(); ++delta_it)
|
||||
for (; delta_it != deltas.rend(); ++delta_it)
|
||||
{
|
||||
if (delta_it->zxid < rollback_zxid)
|
||||
break;
|
||||
@ -431,29 +461,56 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
}
|
||||
},
|
||||
delta_it->operation);
|
||||
|
||||
auto & path_deltas = deltas_for_path.at(delta_it->path);
|
||||
if (path_deltas.back() == &*delta_it)
|
||||
{
|
||||
path_deltas.pop_back();
|
||||
if (path_deltas.empty())
|
||||
deltas_for_path.erase(delta_it->path);
|
||||
}
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta_it->operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
|
||||
if (uncommitted_auth.back() == &add_auth->auth_id)
|
||||
{
|
||||
uncommitted_auth.pop_back();
|
||||
if (uncommitted_auth.empty())
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::erase_if(deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
|
||||
if (delta_it == deltas.rend())
|
||||
deltas.clear();
|
||||
else
|
||||
deltas.erase(delta_it.base(), deltas.end());
|
||||
|
||||
std::unordered_set<std::string> deleted_nodes;
|
||||
absl::flat_hash_set<std::string> deleted_nodes;
|
||||
std::erase_if(
|
||||
nodes,
|
||||
[&, rollback_zxid](const auto & node)
|
||||
{
|
||||
if (node.second.zxid == rollback_zxid)
|
||||
{
|
||||
deleted_nodes.emplace(node.first);
|
||||
deleted_nodes.emplace(std::move(node.first));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// recalculate all the uncommitted deleted nodes
|
||||
for (const auto & delta : deltas)
|
||||
for (const auto & deleted_node : deleted_nodes)
|
||||
{
|
||||
if (!delta.path.empty() && deleted_nodes.contains(delta.path))
|
||||
applyDelta(delta);
|
||||
auto path_delta_it = deltas_for_path.find(deleted_node);
|
||||
if (path_delta_it != deltas_for_path.end())
|
||||
{
|
||||
for (const auto & delta : path_delta_it->second)
|
||||
{
|
||||
applyDelta(*delta);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,27 +229,42 @@ public:
|
||||
|
||||
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
|
||||
{
|
||||
for (const auto & session_auth : storage.session_and_auth[session_id])
|
||||
const auto check_auth = [&](const auto & auth_ids)
|
||||
{
|
||||
if (predicate(session_auth))
|
||||
return true;
|
||||
}
|
||||
for (const auto & auth : auth_ids)
|
||||
{
|
||||
using TAuth = std::remove_reference_t<decltype(auth)>;
|
||||
|
||||
const AuthID * auth_ptr = nullptr;
|
||||
if constexpr (std::is_pointer_v<TAuth>)
|
||||
auth_ptr = auth;
|
||||
else
|
||||
auth_ptr = &auth;
|
||||
|
||||
if (predicate(*auth_ptr))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
if (is_local)
|
||||
return check_auth(storage.session_and_auth[session_id]);
|
||||
|
||||
if (check_auth(storage.session_and_auth[session_id]))
|
||||
return true;
|
||||
|
||||
// check if there are uncommitted
|
||||
const auto auth_it = session_and_auth.find(session_id);
|
||||
if (auth_it == session_and_auth.end())
|
||||
return false;
|
||||
|
||||
for (const auto & delta : deltas)
|
||||
{
|
||||
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
|
||||
auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return check_auth(auth_it->second);
|
||||
}
|
||||
|
||||
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
|
||||
|
||||
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
|
||||
|
||||
struct UncommittedNode
|
||||
{
|
||||
std::shared_ptr<Node> node{nullptr};
|
||||
@ -257,7 +272,32 @@ public:
|
||||
int64_t zxid{0};
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode> nodes;
|
||||
struct Hash
|
||||
{
|
||||
auto operator()(const std::string_view view) const
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(view);
|
||||
return hash.get64();
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
struct Equal
|
||||
{
|
||||
auto operator()(const std::string_view a,
|
||||
const std::string_view b) const
|
||||
{
|
||||
return a == b;
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
|
||||
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
|
||||
|
||||
std::list<Delta> deltas;
|
||||
KeeperStorage & storage;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user