mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #60226 from ClickHouse/keeper-better-snapshot
Slightly better Keeper loading from snapshot
This commit is contained in:
commit
7ea44f95e3
@ -20,7 +20,6 @@
|
||||
#include <Core/Field.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -99,9 +98,12 @@ namespace
|
||||
|
||||
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
|
||||
{
|
||||
String new_data;
|
||||
readBinary(new_data, in);
|
||||
node.setData(new_data);
|
||||
readVarUInt(node.data_size, in);
|
||||
if (node.data_size != 0)
|
||||
{
|
||||
node.data = std::unique_ptr<char[]>(new char[node.data_size]);
|
||||
in.readStrict(node.data.get(), node.data_size);
|
||||
}
|
||||
|
||||
if (version >= SnapshotVersion::V1)
|
||||
{
|
||||
@ -373,25 +375,36 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
size_t snapshot_container_size;
|
||||
readBinary(snapshot_container_size, in);
|
||||
|
||||
storage.container.reserve(snapshot_container_size);
|
||||
|
||||
if (recalculate_digest)
|
||||
storage.nodes_digest = 0;
|
||||
|
||||
for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
|
||||
{
|
||||
std::string path;
|
||||
readBinary(path, in);
|
||||
size_t path_size = 0;
|
||||
readVarUInt(path_size, in);
|
||||
chassert(path_size != 0);
|
||||
auto path_data = storage.container.allocateKey(path_size);
|
||||
in.readStrict(path_data.get(), path_size);
|
||||
std::string_view path{path_data.get(), path_size};
|
||||
|
||||
KeeperStorage::Node node{};
|
||||
readNode(node, in, current_version, storage.acl_map);
|
||||
|
||||
using enum Coordination::PathMatchResult;
|
||||
auto match_result = Coordination::matchPath(path, keeper_system_path);
|
||||
|
||||
const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
|
||||
const auto get_error_msg = [&]
|
||||
{
|
||||
return fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
|
||||
};
|
||||
|
||||
if (match_result == IS_CHILD)
|
||||
{
|
||||
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
|
||||
{
|
||||
LOG_ERROR(getLogger("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
|
||||
LOG_ERROR(getLogger("KeeperSnapshotManager"), "{}. Ignoring it", get_error_msg());
|
||||
continue;
|
||||
}
|
||||
else
|
||||
@ -399,7 +412,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"{}. Ignoring it can lead to data loss. "
|
||||
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
|
||||
error_msg);
|
||||
get_error_msg());
|
||||
}
|
||||
else if (match_result == EXACT)
|
||||
{
|
||||
@ -407,7 +420,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
{
|
||||
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
|
||||
{
|
||||
LOG_ERROR(getLogger("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
|
||||
LOG_ERROR(getLogger("KeeperSnapshotManager"), "{}. Ignoring it", get_error_msg());
|
||||
node = KeeperStorage::Node{};
|
||||
}
|
||||
else
|
||||
@ -415,18 +428,25 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"{}. Ignoring it can lead to data loss. "
|
||||
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
|
||||
error_msg);
|
||||
get_error_msg());
|
||||
}
|
||||
}
|
||||
|
||||
storage.container.insertOrReplace(path, node);
|
||||
if (node.isEphemeral())
|
||||
storage.ephemerals[node.ephemeralOwner()].insert(path);
|
||||
auto ephemeral_owner = node.ephemeralOwner();
|
||||
if (!node.isEphemeral() && node.numChildren() > 0)
|
||||
node.getChildren().reserve(node.numChildren());
|
||||
|
||||
if (ephemeral_owner != 0)
|
||||
storage.ephemerals[node.ephemeralOwner()].insert(std::string{path});
|
||||
|
||||
if (recalculate_digest)
|
||||
storage.nodes_digest += node.getDigest(path);
|
||||
|
||||
storage.container.insertOrReplace(std::move(path_data), path_size, std::move(node));
|
||||
}
|
||||
|
||||
LOG_TRACE(getLogger("KeeperSnapshotManager"), "Building structure for children nodes");
|
||||
|
||||
for (const auto & itr : storage.container)
|
||||
{
|
||||
if (itr.key != "/")
|
||||
|
@ -233,6 +233,38 @@ KeeperStorage::Node::Node(const Node & other)
|
||||
*this = other;
|
||||
}
|
||||
|
||||
KeeperStorage::Node & KeeperStorage::Node::operator=(Node && other) noexcept
|
||||
{
|
||||
if (this == &other)
|
||||
return *this;
|
||||
|
||||
czxid = other.czxid;
|
||||
mzxid = other.mzxid;
|
||||
pzxid = other.pzxid;
|
||||
acl_id = other.acl_id;
|
||||
mtime = other.mtime;
|
||||
is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
|
||||
ephemeral_or_children_data = other.ephemeral_or_children_data;
|
||||
version = other.version;
|
||||
cversion = other.cversion;
|
||||
aversion = other.aversion;
|
||||
|
||||
data_size = other.data_size;
|
||||
data = std::move(other.data);
|
||||
|
||||
other.data_size = 0;
|
||||
|
||||
static_assert(std::is_nothrow_move_assignable_v<ChildrenSet>);
|
||||
children = std::move(other.children);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
KeeperStorage::Node::Node(Node && other) noexcept
|
||||
{
|
||||
*this = std::move(other);
|
||||
}
|
||||
|
||||
bool KeeperStorage::Node::empty() const
|
||||
{
|
||||
return data_size == 0 && mzxid == 0;
|
||||
|
@ -52,9 +52,11 @@ public:
|
||||
Node() = default;
|
||||
|
||||
Node & operator=(const Node & other);
|
||||
|
||||
Node(const Node & other);
|
||||
|
||||
Node & operator=(Node && other) noexcept;
|
||||
Node(Node && other) noexcept;
|
||||
|
||||
bool empty() const;
|
||||
|
||||
bool isEphemeral() const
|
||||
@ -146,6 +148,7 @@ public:
|
||||
void removeChild(StringRef child_path);
|
||||
|
||||
const auto & getChildren() const noexcept { return children; }
|
||||
auto & getChildren() { return children; }
|
||||
|
||||
// Invalidate the calculated digest so it's recalculated again on the next
|
||||
// getDigest call
|
||||
|
@ -169,6 +169,49 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void insertOrReplace(StringRef key, V value, bool owns_key)
|
||||
{
|
||||
size_t hash_value = map.hash(key);
|
||||
auto new_value_size = value.sizeInBytes();
|
||||
auto it = map.find(key, hash_value);
|
||||
uint64_t old_value_size = it == map.end() ? 0 : it->getMapped()->value.sizeInBytes();
|
||||
|
||||
if (it == map.end())
|
||||
{
|
||||
auto list_key = owns_key ? key : copyStringInArena(arena, key);
|
||||
ListElem elem{list_key, std::move(value)};
|
||||
elem.setVersion(current_version);
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
itr->setActiveInMap();
|
||||
chassert(inserted);
|
||||
it->getMapped() = itr;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (owns_key)
|
||||
arena.free(key.data, key.size);
|
||||
|
||||
auto list_itr = it->getMapped();
|
||||
if (snapshot_mode)
|
||||
{
|
||||
ListElem elem{list_itr->key, std::move(value)};
|
||||
elem.setVersion(current_version);
|
||||
list_itr->setInactiveInMap();
|
||||
auto new_list_itr = list.insert(list.end(), std::move(elem));
|
||||
it->getMapped() = new_list_itr;
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
list_itr->value = std::move(value);
|
||||
}
|
||||
}
|
||||
updateDataSize(INSERT_OR_REPLACE, key.size, new_value_size, old_value_size, !snapshot_mode);
|
||||
}
|
||||
|
||||
|
||||
public:
|
||||
|
||||
using iterator = typename List::iterator;
|
||||
@ -203,41 +246,39 @@ public:
|
||||
return std::make_pair(it, false);
|
||||
}
|
||||
|
||||
void insertOrReplace(const std::string & key, const V & value)
|
||||
void reserve(size_t node_num)
|
||||
{
|
||||
size_t hash_value = map.hash(key);
|
||||
auto it = map.find(key, hash_value);
|
||||
uint64_t old_value_size = it == map.end() ? 0 : it->getMapped()->value.sizeInBytes();
|
||||
map.reserve(node_num);
|
||||
}
|
||||
|
||||
if (it == map.end())
|
||||
void insertOrReplace(const std::string & key, V value)
|
||||
{
|
||||
insertOrReplace(key, std::move(value), /*owns_key*/ false);
|
||||
}
|
||||
|
||||
struct KeyDeleter
|
||||
{
|
||||
void operator()(const char * key)
|
||||
{
|
||||
ListElem elem{copyStringInArena(arena, key), value};
|
||||
elem.setVersion(current_version);
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
itr->setActiveInMap();
|
||||
chassert(inserted);
|
||||
it->getMapped() = itr;
|
||||
if (key)
|
||||
arena->free(key, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto list_itr = it->getMapped();
|
||||
if (snapshot_mode)
|
||||
{
|
||||
ListElem elem{list_itr->key, value};
|
||||
elem.setVersion(current_version);
|
||||
list_itr->setInactiveInMap();
|
||||
auto new_list_itr = list.insert(list.end(), std::move(elem));
|
||||
it->getMapped() = new_list_itr;
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
list_itr->value = value;
|
||||
}
|
||||
}
|
||||
updateDataSize(INSERT_OR_REPLACE, key.size(), value.sizeInBytes(), old_value_size, !snapshot_mode);
|
||||
|
||||
size_t size;
|
||||
GlobalArena * arena;
|
||||
};
|
||||
|
||||
using KeyPtr = std::unique_ptr<char[], KeyDeleter>;
|
||||
|
||||
KeyPtr allocateKey(size_t size)
|
||||
{
|
||||
return KeyPtr{new char[size], KeyDeleter{size, &arena}};
|
||||
}
|
||||
|
||||
void insertOrReplace(KeyPtr key_data, size_t key_size, V value)
|
||||
{
|
||||
StringRef key{key_data.release(), key_size};
|
||||
insertOrReplace(key, std::move(value), /*owns_key*/ true);
|
||||
}
|
||||
|
||||
bool erase(const std::string & key)
|
||||
|
Loading…
Reference in New Issue
Block a user