Correctly calculate size after snapshot load

This commit is contained in:
Antonio Andelic 2023-02-06 14:33:45 +00:00
parent 61ddae6c2f
commit aca4f08bf5
12 changed files with 96 additions and 15 deletions

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -145,6 +145,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr request_leader_command = std::make_shared<RequestLeaderCommand>(keeper_dispatcher);
factory.registerCommand(request_leader_command);
FourLetterCommandPtr recalculate_command = std::make_shared<RecalculateCommand>(keeper_dispatcher);
factory.registerCommand(recalculate_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -515,4 +518,10 @@ String RequestLeaderCommand::run()
return keeper_dispatcher.requestLeader() ? "Sent leadership request to leader." : "Failed to send leadership request to leader.";
}
String RecalculateCommand::run()
{
keeper_dispatcher.recalculateStorageStats();
return "ok";
}
}

View File

@ -377,4 +377,17 @@ struct RequestLeaderCommand : public IFourLetterCommand
~RequestLeaderCommand() override = default;
};
/// Request to be leader.
struct RecalculateCommand : public IFourLetterCommand
{
explicit RecalculateCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "rclc"; }
String run() override;
~RecalculateCommand() override = default;
};
}

View File

@ -225,6 +225,11 @@ public:
{
return server->requestLeader();
}
void recalculateStorageStats()
{
return server->recalculateStorageStats();
}
};
}

View File

@ -947,4 +947,9 @@ bool KeeperServer::requestLeader()
return isLeader() || raft_instance->request_leadership();
}
void KeeperServer::recalculateStorageStats()
{
state_machine->recalculateStorageStats();
}
}

View File

@ -137,6 +137,8 @@ public:
KeeperLogInfo getKeeperLogInfo();
bool requestLeader();
void recalculateStorageStats();
};
}

View File

@ -361,19 +361,25 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}
else if (match_result == EXACT && !is_node_empty(node))
else if (match_result == EXACT)
{
if (keeper_context->ignore_system_path_on_startup || keeper_context->server_state != KeeperContext::Phase::INIT)
if (!is_node_empty(node))
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
node = KeeperStorage::Node{};
if (keeper_context->ignore_system_path_on_startup || keeper_context->server_state != KeeperContext::Phase::INIT)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
node = KeeperStorage::Node{};
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
// we always ignore the written size for this node
node.recalculateSize();
}
storage.container.insertOrReplace(path, node);
@ -390,7 +396,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(
parent_path, [path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseName(path)); });
parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseName(path), /*update_size*/ version < SnapshotVersion::V4); });
}
}

View File

@ -640,4 +640,12 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
return nullptr;
}
void KeeperStateMachine::recalculateStorageStats()
{
std::lock_guard lock(storage_and_responses_lock);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");
}
}

View File

@ -103,6 +103,7 @@ public:
uint64_t getKeyArenaSize() const;
uint64_t getLatestSnapshotBufSize() const;
void recalculateStorageStats();
private:
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.

View File

@ -201,9 +201,10 @@ void KeeperStorage::Node::setData(String new_data)
data = std::move(new_data);
}
void KeeperStorage::Node::addChild(StringRef child_path)
void KeeperStorage::Node::addChild(StringRef child_path, bool update_size)
{
size_bytes += sizeof child_path;
if (update_size) [[likely]]
size_bytes += sizeof child_path;
children.insert(child_path);
}
@ -234,6 +235,16 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
cached_digest = other.cached_digest;
}
void KeeperStorage::Node::recalculateSize()
{
size_bytes = sizeof(Node);
for (const auto child_path : children)
size_bytes += sizeof child_path;
size_bytes += data.size();
}
KeeperStorage::KeeperStorage(
int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, const bool initialize_system_nodes)
: session_expiry_queue(tick_time_ms), keeper_context(keeper_context_), superdigest(superdigest_)
@ -2407,5 +2418,10 @@ uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
return ret;
}
void KeeperStorage::recalculateStats()
{
container.recalculateDataSize();
}
}

View File

@ -47,7 +47,7 @@ public:
const auto & getData() const noexcept { return data; }
void addChild(StringRef child_path);
void addChild(StringRef child_path, bool update_size = true);
void removeChild(StringRef child_path);
@ -64,6 +64,8 @@ public:
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
void recalculateSize();
private:
String data;
ChildrenSet children{};
@ -466,6 +468,7 @@ public:
void dumpWatchesByPath(WriteBufferFromOwnString & buf) const;
void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const;
void recalculateStats();
private:
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);

View File

@ -8,6 +8,8 @@
#include <atomic>
#include <iostream>
#include <Common/logger_useful.h>
namespace DB
{
@ -319,6 +321,17 @@ public:
return approximate_data_size;
}
void recalculateDataSize()
{
approximate_data_size = 0;
for (auto & node : list)
{
node.value.recalculateSize();
approximate_data_size += node.key.size;
approximate_data_size += node.value.sizeInBytes();
}
}
uint64_t keyArenaSize() const
{
return arena.size();