ClickHouse/src/Coordination/KeeperSnapshotManager.cpp

811 lines
28 KiB
C++
Raw Normal View History

2021-03-01 13:33:34 +00:00
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
2022-05-19 09:45:38 +00:00
#include <Coordination/KeeperSnapshotManager.h>
2021-03-01 13:33:34 +00:00
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadBufferFromFile.h>
2022-05-19 09:45:38 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
2021-03-01 13:33:34 +00:00
#include <IO/copyData.h>
2022-05-19 09:45:38 +00:00
#include <Common/ZooKeeper/ZooKeeperIO.h>
2021-03-01 13:33:34 +00:00
#include <filesystem>
#include <memory>
2022-06-10 13:10:45 +00:00
#include <Common/logger_useful.h>
#include <Coordination/KeeperContext.h>
2023-05-17 13:32:51 +00:00
#include <Coordination/pathUtils.h>
2022-07-21 09:31:06 +00:00
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
2023-05-30 13:22:40 +00:00
#include "Core/Field.h"
#include <Disks/DiskLocal.h>
2021-01-21 11:07:55 +00:00
namespace DB
{
2021-03-01 13:33:34 +00:00
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int UNKNOWN_SNAPSHOT;
2022-06-10 13:10:45 +00:00
extern const int LOGICAL_ERROR;
2021-03-01 13:33:34 +00:00
}
2021-01-21 11:07:55 +00:00
namespace
{
2023-05-30 13:22:40 +00:00
constexpr std::string_view tmp_prefix = "tmp_";
void moveFileBetweenDisks(DiskPtr disk_from, const std::string & path_from, DiskPtr disk_to, const std::string & path_to)
{
/// we use empty file with prefix tmp_ to detect incomplete copies
/// if a copy is complete we don't care from which disk we use the same file
/// so it's okay if a failure happens after removing of tmp file but before we remove
/// the snapshot from the source disk
auto from_path = fs::path(path_from);
auto tmp_snapshot_name = from_path.parent_path() / (std::string{tmp_prefix} + from_path.filename().string());
{
disk_to->writeFile(tmp_snapshot_name);
}
disk_from->copyFile(from_path, *disk_to, path_to, {});
disk_to->removeFile(tmp_snapshot_name);
disk_from->removeFile(path_from);
}
2021-04-08 14:17:57 +00:00
uint64_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
2021-03-01 13:33:34 +00:00
{
std::filesystem::path path(snapshot_path);
std::string filename = path.stem();
Strings name_parts;
splitInto<'_'>(name_parts, filename);
2021-04-08 14:17:57 +00:00
return parse<uint64_t>(name_parts[1]);
2021-03-01 13:33:34 +00:00
}
std::string getSnapshotFileName(uint64_t up_to_log_idx, bool compress_zstd)
2021-03-01 13:33:34 +00:00
{
auto base = std::string{"snapshot_"} + std::to_string(up_to_log_idx) + ".bin";
if (compress_zstd)
base += ".zstd";
return base;
2021-03-01 13:33:34 +00:00
}
2021-12-09 15:09:56 +00:00
void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out)
2021-02-26 14:54:59 +00:00
{
2022-04-05 06:27:03 +00:00
writeBinary(node.getData(), out);
2021-03-04 13:02:30 +00:00
/// Serialize ACL
writeBinary(node.acl_id, out);
2021-03-04 13:02:30 +00:00
writeBinary(node.is_sequental, out);
/// Serialize stat
writeBinary(node.stat.czxid, out);
writeBinary(node.stat.mzxid, out);
writeBinary(node.stat.ctime, out);
writeBinary(node.stat.mtime, out);
writeBinary(node.stat.version, out);
writeBinary(node.stat.cversion, out);
writeBinary(node.stat.aversion, out);
writeBinary(node.stat.ephemeralOwner, out);
writeBinary(node.stat.dataLength, out);
writeBinary(node.stat.numChildren, out);
writeBinary(node.stat.pzxid, out);
writeBinary(node.seq_num, out);
2021-12-08 06:52:21 +00:00
2021-12-09 15:09:56 +00:00
if (version >= SnapshotVersion::V4)
{
writeBinary(node.size_bytes, out);
}
2021-02-26 14:54:59 +00:00
}
2021-01-21 11:07:55 +00:00
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
2021-02-26 14:54:59 +00:00
{
2022-04-05 06:27:03 +00:00
String new_data;
readBinary(new_data, in);
node.setData(std::move(new_data));
2021-03-04 13:02:30 +00:00
if (version >= SnapshotVersion::V1)
{
readBinary(node.acl_id, in);
}
else if (version == SnapshotVersion::V0)
2021-03-04 13:02:30 +00:00
{
/// Deserialize ACL
size_t acls_size;
readBinary(acls_size, in);
Coordination::ACLs acls;
for (size_t i = 0; i < acls_size; ++i)
{
Coordination::ACL acl;
readBinary(acl.permissions, in);
readBinary(acl.scheme, in);
readBinary(acl.id, in);
acls.push_back(acl);
}
node.acl_id = acl_map.convertACLs(acls);
2021-03-04 13:02:30 +00:00
}
2021-06-18 18:36:19 +00:00
/// Some strange ACLID during deserialization from ZooKeeper
if (node.acl_id == std::numeric_limits<uint64_t>::max())
node.acl_id = 0;
acl_map.addUsage(node.acl_id);
2021-03-04 13:02:30 +00:00
readBinary(node.is_sequental, in);
/// Deserialize stat
readBinary(node.stat.czxid, in);
readBinary(node.stat.mzxid, in);
readBinary(node.stat.ctime, in);
readBinary(node.stat.mtime, in);
readBinary(node.stat.version, in);
readBinary(node.stat.cversion, in);
readBinary(node.stat.aversion, in);
readBinary(node.stat.ephemeralOwner, in);
readBinary(node.stat.dataLength, in);
readBinary(node.stat.numChildren, in);
readBinary(node.stat.pzxid, in);
readBinary(node.seq_num, in);
2021-12-09 15:09:56 +00:00
if (version >= SnapshotVersion::V4)
{
readBinary(node.size_bytes, in);
}
2021-02-26 14:54:59 +00:00
}
2021-03-02 14:30:56 +00:00
void serializeSnapshotMetadata(const SnapshotMetadataPtr & snapshot_meta, WriteBuffer & out)
{
auto buffer = snapshot_meta->serialize();
2021-03-04 13:02:30 +00:00
writeVarUInt(buffer->size(), out);
out.write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size());
2021-03-02 14:30:56 +00:00
}
SnapshotMetadataPtr deserializeSnapshotMetadata(ReadBuffer & in)
{
2021-03-04 13:02:30 +00:00
size_t data_size;
readVarUInt(data_size, in);
auto buffer = nuraft::buffer::alloc(data_size);
in.readStrict(reinterpret_cast<char *>(buffer->data_begin()), data_size);
2021-03-02 14:30:56 +00:00
buffer->pos(0);
return SnapshotMetadata::deserialize(*buffer);
}
2021-01-21 11:07:55 +00:00
}
2022-07-23 14:27:44 +00:00
void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context)
2021-01-21 11:07:55 +00:00
{
2021-03-04 13:02:30 +00:00
writeBinary(static_cast<uint8_t>(snapshot.version), out);
2021-03-02 14:30:56 +00:00
serializeSnapshotMetadata(snapshot.snapshot_meta, out);
2022-05-13 08:24:44 +00:00
if (snapshot.version >= SnapshotVersion::V5)
2022-05-16 13:08:10 +00:00
{
2022-05-13 08:24:44 +00:00
writeBinary(snapshot.zxid, out);
2023-05-17 13:32:51 +00:00
if (keeper_context->digestEnabled())
2022-05-19 09:45:38 +00:00
{
writeBinary(static_cast<uint8_t>(KeeperStorage::CURRENT_DIGEST_VERSION), out);
writeBinary(snapshot.nodes_digest, out);
}
else
writeBinary(static_cast<uint8_t>(KeeperStorage::NO_DIGEST), out);
2022-05-16 13:08:10 +00:00
}
2022-05-13 08:24:44 +00:00
2021-03-04 13:02:30 +00:00
writeBinary(snapshot.session_id, out);
/// Better to sort before serialization, otherwise snapshots can be different on different replicas
std::vector<std::pair<int64_t, Coordination::ACLs>> sorted_acl_map(snapshot.acl_map.begin(), snapshot.acl_map.end());
2022-06-13 13:31:08 +00:00
::sort(sorted_acl_map.begin(), sorted_acl_map.end());
/// Serialize ACLs map
writeBinary(sorted_acl_map.size(), out);
for (const auto & [acl_id, acls] : sorted_acl_map)
{
writeBinary(acl_id, out);
writeBinary(acls.size(), out);
for (const auto & acl : acls)
{
writeBinary(acl.permissions, out);
writeBinary(acl.scheme, out);
writeBinary(acl.id, out);
}
}
/// Serialize data tree
2022-07-22 10:55:13 +00:00
writeBinary(snapshot.snapshot_container_size - child_system_paths_with_data.size(), out);
2021-03-01 15:32:27 +00:00
size_t counter = 0;
for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++counter)
2021-03-01 13:33:34 +00:00
{
const auto & path = it->key;
2022-07-22 08:07:38 +00:00
// write only the root system path because of digest
if (Coordination::matchPath(path.toView(), keeper_system_path) == Coordination::PathMatchResult::IS_CHILD)
2022-07-22 10:55:13 +00:00
{
2022-11-25 12:50:42 +00:00
if (counter == snapshot.snapshot_container_size - 1)
break;
2022-07-22 10:55:13 +00:00
++it;
2022-07-22 08:07:38 +00:00
continue;
2022-07-22 10:55:13 +00:00
}
2022-07-22 08:07:38 +00:00
2021-03-01 13:33:34 +00:00
const auto & node = it->value;
2022-07-21 09:31:06 +00:00
2022-01-22 16:30:45 +00:00
/// Benign race condition possible while taking snapshot: NuRaft decide to create snapshot at some log id
/// and only after some time we lock storage and enable snapshot mode. So snapshot_container_size can be
2022-01-22 19:36:23 +00:00
/// slightly bigger than required.
2022-05-19 09:45:38 +00:00
if (node.stat.mzxid > snapshot.zxid)
2022-01-22 16:30:45 +00:00
break;
2021-03-03 12:21:21 +00:00
2021-03-04 13:02:30 +00:00
writeBinary(path, out);
2021-12-09 15:09:56 +00:00
writeNode(node, snapshot.version, out);
/// Last iteration: check and exit here without iterator increment. Otherwise
/// false positive race condition on list end is possible.
if (counter == snapshot.snapshot_container_size - 1)
break;
++it;
2021-03-01 13:33:34 +00:00
}
/// Session must be saved in a sorted order,
/// otherwise snapshots will be different
2022-05-19 09:45:38 +00:00
std::vector<std::pair<int64_t, int64_t>> sorted_session_and_timeout(
snapshot.session_and_timeout.begin(), snapshot.session_and_timeout.end());
2022-06-13 13:31:08 +00:00
::sort(sorted_session_and_timeout.begin(), sorted_session_and_timeout.end());
/// Serialize sessions
size_t size = sorted_session_and_timeout.size();
2021-03-04 13:02:30 +00:00
writeBinary(size, out);
for (const auto & [session_id, timeout] : sorted_session_and_timeout)
2021-01-21 11:07:55 +00:00
{
2021-03-04 13:02:30 +00:00
writeBinary(session_id, out);
writeBinary(timeout, out);
KeeperStorage::AuthIDs ids;
if (snapshot.session_and_auth.contains(session_id))
ids = snapshot.session_and_auth.at(session_id);
writeBinary(ids.size(), out);
for (const auto & [scheme, id] : ids)
{
writeBinary(scheme, out);
writeBinary(id, out);
}
2021-01-21 11:07:55 +00:00
}
2021-10-18 15:27:51 +00:00
2021-10-19 14:10:09 +00:00
/// Serialize cluster config
2021-10-18 15:27:51 +00:00
if (snapshot.cluster_config)
{
auto buffer = snapshot.cluster_config->serialize();
writeVarUInt(buffer->size(), out);
out.write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size());
}
2021-01-21 11:07:55 +00:00
}
2022-07-23 14:27:44 +00:00
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context)
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
uint8_t version;
2021-03-04 13:02:30 +00:00
readBinary(version, in);
SnapshotVersion current_version = static_cast<SnapshotVersion>(version);
2021-06-17 16:37:08 +00:00
if (current_version > CURRENT_SNAPSHOT_VERSION)
2021-03-01 13:33:34 +00:00
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
2021-10-18 15:27:51 +00:00
deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in);
KeeperStorage & storage = *deserialization_result.storage;
2023-05-17 13:32:51 +00:00
bool recalculate_digest = keeper_context->digestEnabled();
2022-05-13 08:24:44 +00:00
if (version >= SnapshotVersion::V5)
2022-05-16 13:08:10 +00:00
{
2022-05-13 08:24:44 +00:00
readBinary(storage.zxid, in);
2022-05-17 08:11:08 +00:00
uint8_t digest_version;
readBinary(digest_version, in);
if (digest_version != KeeperStorage::DigestVersion::NO_DIGEST)
{
uint64_t nodes_digest;
readBinary(nodes_digest, in);
if (digest_version == KeeperStorage::CURRENT_DIGEST_VERSION)
{
storage.nodes_digest = nodes_digest;
recalculate_digest = false;
}
}
storage.old_snapshot_zxid = 0;
2022-05-16 13:08:10 +00:00
}
2022-05-13 08:24:44 +00:00
else
{
2022-05-13 08:24:44 +00:00
storage.zxid = deserialization_result.snapshot_meta->get_last_log_idx();
storage.old_snapshot_zxid = storage.zxid;
}
2022-05-13 08:24:44 +00:00
2021-03-04 11:22:59 +00:00
int64_t session_id;
2021-03-04 13:02:30 +00:00
readBinary(session_id, in);
2021-03-01 13:33:34 +00:00
storage.session_id_counter = session_id;
/// Before V1 we serialized ACL without acl_map
if (current_version >= SnapshotVersion::V1)
{
size_t acls_map_size;
2021-06-18 18:36:19 +00:00
readBinary(acls_map_size, in);
size_t current_map_size = 0;
while (current_map_size < acls_map_size)
{
uint64_t acl_id;
readBinary(acl_id, in);
2021-06-18 18:36:19 +00:00
size_t acls_size;
readBinary(acls_size, in);
Coordination::ACLs acls;
for (size_t i = 0; i < acls_size; ++i)
{
Coordination::ACL acl;
readBinary(acl.permissions, in);
readBinary(acl.scheme, in);
readBinary(acl.id, in);
acls.push_back(acl);
}
storage.acl_map.addMapping(acl_id, acls);
current_map_size++;
}
}
2021-03-01 13:33:34 +00:00
size_t snapshot_container_size;
2021-03-04 13:02:30 +00:00
readBinary(snapshot_container_size, in);
2021-03-01 13:33:34 +00:00
2022-05-19 09:45:38 +00:00
if (recalculate_digest)
storage.nodes_digest = 0;
const auto is_node_empty = [](const auto & node)
{
return node.getData().empty() && node.stat == Coordination::Stat{};
};
for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
std::string path;
2021-03-04 13:02:30 +00:00
readBinary(path, in);
KeeperStorage::Node node{};
readNode(node, in, current_version, storage.acl_map);
2022-07-21 09:31:06 +00:00
using enum Coordination::PathMatchResult;
auto match_result = Coordination::matchPath(path, keeper_system_path);
2022-07-25 12:38:48 +00:00
const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
if (match_result == IS_CHILD)
{
2023-05-17 13:32:51 +00:00
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
2022-07-25 12:38:48 +00:00
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
continue;
2022-07-25 12:38:48 +00:00
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}
else if (match_result == EXACT)
2022-07-25 12:38:48 +00:00
{
if (!is_node_empty(node))
2022-07-25 12:38:48 +00:00
{
2023-05-17 13:32:51 +00:00
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
node = KeeperStorage::Node{};
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
2022-07-25 12:38:48 +00:00
}
// we always ignore the written size for this node
node.recalculateSize();
}
2021-03-01 13:33:34 +00:00
storage.container.insertOrReplace(path, node);
2021-03-03 12:29:00 +00:00
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
2021-03-01 13:33:34 +00:00
2022-05-17 08:11:08 +00:00
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
2021-01-21 11:07:55 +00:00
}
2021-03-03 12:21:21 +00:00
2021-03-02 15:19:05 +00:00
for (const auto & itr : storage.container)
{
if (itr.key != "/")
{
2023-05-17 13:32:51 +00:00
auto parent_path = parentNodePath(itr.key);
2022-05-19 09:45:38 +00:00
storage.container.updateValue(
2023-05-17 13:32:51 +00:00
parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); });
2021-03-02 15:19:05 +00:00
}
}
2021-03-01 13:33:34 +00:00
2022-06-10 13:10:45 +00:00
for (const auto & itr : storage.container)
{
if (itr.key != "/")
{
if (itr.value.stat.numChildren != static_cast<int32_t>(itr.value.getChildren().size()))
{
#ifdef NDEBUG
2022-06-10 13:27:27 +00:00
/// TODO (alesapin) remove this, it should be always CORRUPTED_DATA.
2022-06-10 13:10:45 +00:00
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}", itr.value.stat.numChildren, itr.value.getChildren().size(), itr.key);
#else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",
itr.value.stat.numChildren, itr.value.getChildren().size(), itr.key);
2022-06-10 13:10:45 +00:00
#endif
}
}
}
2021-03-01 13:33:34 +00:00
size_t active_sessions_size;
2021-03-04 13:02:30 +00:00
readBinary(active_sessions_size, in);
2021-03-01 13:33:34 +00:00
2021-03-01 15:32:27 +00:00
size_t current_session_size = 0;
2021-03-01 13:33:34 +00:00
while (current_session_size < active_sessions_size)
{
int64_t active_session_id, timeout;
2021-03-04 13:02:30 +00:00
readBinary(active_session_id, in);
readBinary(timeout, in);
2021-03-01 13:33:34 +00:00
storage.addSessionID(active_session_id, timeout);
if (current_version >= SnapshotVersion::V1)
{
size_t session_auths_size;
readBinary(session_auths_size, in);
KeeperStorage::AuthIDs ids;
size_t session_auth_counter = 0;
while (session_auth_counter < session_auths_size)
{
String scheme, id;
readBinary(scheme, in);
readBinary(id, in);
ids.emplace_back(KeeperStorage::AuthID{scheme, id});
2021-05-28 13:07:16 +00:00
session_auth_counter++;
}
2021-05-28 15:37:23 +00:00
if (!ids.empty())
storage.session_and_auth[active_session_id] = ids;
}
2021-03-01 13:33:34 +00:00
current_session_size++;
}
2021-03-02 14:30:56 +00:00
2021-10-18 15:27:51 +00:00
/// Optional cluster config
ClusterConfigPtr cluster_config = nullptr;
if (!in.eof())
{
size_t data_size;
readVarUInt(data_size, in);
auto buffer = nuraft::buffer::alloc(data_size);
in.readStrict(reinterpret_cast<char *>(buffer->data_begin()), data_size);
buffer->pos(0);
deserialization_result.cluster_config = ClusterConfig::deserialize(*buffer);
}
2021-03-01 13:33:34 +00:00
}
2021-10-18 15:27:51 +00:00
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_)
2021-03-01 13:33:34 +00:00
: storage(storage_)
2021-03-02 14:30:56 +00:00
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
2021-10-18 15:27:51 +00:00
, cluster_config(cluster_config_)
2022-05-13 08:24:44 +00:00
, zxid(storage->zxid)
2022-05-23 14:37:57 +00:00
, nodes_digest(storage->nodes_digest)
2021-03-02 14:30:56 +00:00
{
auto [size, ver] = storage->container.snapshotSizeWithVersion();
snapshot_container_size = size;
storage->enableSnapshotMode(ver);
2021-03-02 14:30:56 +00:00
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
2021-03-02 14:30:56 +00:00
}
2022-05-19 09:45:38 +00:00
KeeperStorageSnapshot::KeeperStorageSnapshot(
KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_)
2021-03-02 14:30:56 +00:00
: storage(storage_)
, snapshot_meta(snapshot_meta_)
2021-03-01 13:33:34 +00:00
, session_id(storage->session_id_counter)
2021-10-18 15:27:51 +00:00
, cluster_config(cluster_config_)
2022-05-13 08:24:44 +00:00
, zxid(storage->zxid)
2022-05-16 13:08:10 +00:00
, nodes_digest(storage->nodes_digest)
2021-03-01 13:33:34 +00:00
{
auto [size, ver] = storage->container.snapshotSizeWithVersion();
snapshot_container_size = size;
storage->enableSnapshotMode(ver);
2021-03-01 13:33:34 +00:00
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
2021-03-01 13:33:34 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperStorageSnapshot::~KeeperStorageSnapshot()
2021-03-01 13:33:34 +00:00
{
storage->disableSnapshotMode();
2021-01-21 11:07:55 +00:00
}
KeeperSnapshotManager::KeeperSnapshotManager(
2022-05-19 09:45:38 +00:00
size_t snapshots_to_keep_,
2022-07-23 14:27:44 +00:00
const KeeperContextPtr & keeper_context_,
bool compress_snapshots_zstd_,
2022-05-19 09:45:38 +00:00
const std::string & superdigest_,
2022-07-23 14:27:44 +00:00
size_t storage_tick_time_)
2023-05-22 12:24:16 +00:00
: snapshots_to_keep(snapshots_to_keep_)
, compress_snapshots_zstd(compress_snapshots_zstd_)
2021-05-23 17:54:42 +00:00
, superdigest(superdigest_)
2021-03-19 08:08:43 +00:00
, storage_tick_time(storage_tick_time_)
2022-07-23 14:27:44 +00:00
, keeper_context(keeper_context_)
2021-03-01 13:33:34 +00:00
{
2023-05-30 13:22:40 +00:00
const auto load_snapshot_from_disk = [&](const auto & disk)
2021-03-01 13:33:34 +00:00
{
2023-05-30 13:22:40 +00:00
LOG_TRACE(log, "Reading from disk {}", disk->getName());
std::unordered_map<std::string, std::string> incomplete_files;
2023-05-25 13:31:11 +00:00
2023-05-30 13:22:40 +00:00
const auto clean_incomplete_file = [&](const auto & file_path)
{
if (auto incomplete_it = incomplete_files.find(fs::path(file_path).filename()); incomplete_it != incomplete_files.end())
2023-05-25 13:31:11 +00:00
{
2023-05-30 13:22:40 +00:00
LOG_TRACE(log, "Removing {} from {}", file_path, disk->getName());
disk->removeFile(file_path);
disk->removeFile(incomplete_it->second);
incomplete_files.erase(incomplete_it);
return true;
2023-05-25 13:31:11 +00:00
}
2023-05-24 07:36:39 +00:00
2023-05-30 13:22:40 +00:00
return false;
};
std::vector<std::string> snapshot_files;
2023-05-25 13:31:11 +00:00
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
2023-05-30 13:22:40 +00:00
if (it->name().starts_with(tmp_prefix))
{
incomplete_files.emplace(it->name().substr(tmp_prefix.size()), it->path());
2023-05-25 13:31:11 +00:00
continue;
2023-05-30 13:22:40 +00:00
}
2023-05-25 13:31:11 +00:00
2023-05-30 13:22:40 +00:00
if (clean_incomplete_file(it->path()))
2023-05-25 13:31:11 +00:00
continue;
2023-05-30 13:22:40 +00:00
snapshot_files.push_back(it->path());
}
for (const auto & snapshot_file : snapshot_files)
{
if (clean_incomplete_file(fs::path(snapshot_file).filename()))
2023-05-25 13:31:11 +00:00
continue;
2023-05-30 13:22:40 +00:00
LOG_TRACE(log, "Found {} on {}", snapshot_file, disk->getName());
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(snapshot_file);
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{snapshot_file, disk});
2023-05-25 13:31:11 +00:00
if (!inserted)
LOG_WARNING(
&Poco::Logger::get("KeeperSnapshotManager"),
"Found another snapshots with last log idx {}, will use snapshot from disk {}",
snapshot_up_to,
disk->getName());
}
2023-05-30 13:22:40 +00:00
for (const auto & [name, path] : incomplete_files)
disk->removeFile(path);
2023-05-25 13:31:11 +00:00
};
2023-05-24 07:36:39 +00:00
2023-05-25 13:31:11 +00:00
for (const auto & disk : keeper_context->getOldSnapshotDisks())
load_snapshot_from_disk(disk);
auto disk = getDisk();
load_snapshot_from_disk(disk);
2023-05-24 07:36:39 +00:00
2023-05-30 13:22:40 +00:00
auto latest_snapshot_disk = getLatestSnapshotDisk();
if (latest_snapshot_disk != disk)
load_snapshot_from_disk(latest_snapshot_disk);
2023-05-25 13:31:11 +00:00
2023-05-30 13:22:40 +00:00
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
2021-03-01 13:33:34 +00:00
}
2023-05-25 08:43:11 +00:00
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
2021-03-01 13:33:34 +00:00
{
ReadBufferFromNuraftBuffer reader(buffer);
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
2021-03-01 13:33:34 +00:00
2023-05-30 13:22:40 +00:00
auto disk = getLatestSnapshotDisk();
2023-05-24 07:36:39 +00:00
{
disk->writeFile(tmp_snapshot_file_name);
}
auto plain_buf = disk->writeFile(snapshot_file_name);
copyData(reader, *plain_buf);
plain_buf->sync();
2023-05-24 07:36:39 +00:00
disk->removeFile(tmp_snapshot_file_name);
2023-05-25 16:01:40 +00:00
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
2021-03-01 14:54:08 +00:00
removeOutdatedSnapshotsIfNeeded();
2023-05-30 13:22:40 +00:00
moveSnapshotsIfNeeded();
2021-03-01 14:54:08 +00:00
2023-05-25 08:43:11 +00:00
return {snapshot_file_name, disk};
2021-03-01 13:33:34 +00:00
}
2021-03-29 08:24:56 +00:00
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
2021-03-02 14:30:56 +00:00
{
while (!existing_snapshots.empty())
2021-03-02 14:30:56 +00:00
{
auto latest_itr = existing_snapshots.rbegin();
try
{
return deserializeSnapshotBufferFromDisk(latest_itr->first);
}
catch (const DB::Exception &)
{
2023-05-25 13:31:11 +00:00
const auto & [path, disk] = latest_itr->second;
disk->removeFile(path);
existing_snapshots.erase(latest_itr->first);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-03-02 14:30:56 +00:00
}
2021-03-02 14:30:56 +00:00
return nullptr;
}
2021-04-08 14:17:57 +00:00
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
2021-03-01 13:33:34 +00:00
{
2023-05-25 13:31:11 +00:00
const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx);
2021-03-01 13:33:34 +00:00
WriteBufferFromNuraftBuffer writer;
2023-05-25 13:31:11 +00:00
auto reader = snapshot_disk->readFile(snapshot_path);
copyData(*reader, writer);
2021-03-01 13:33:34 +00:00
return writer.getBuffer();
}
2021-09-27 14:21:10 +00:00
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const
2021-03-01 13:33:34 +00:00
{
std::unique_ptr<WriteBufferFromNuraftBuffer> writer = std::make_unique<WriteBufferFromNuraftBuffer>();
2021-09-27 14:21:10 +00:00
auto * buffer_raw_ptr = writer.get();
std::unique_ptr<WriteBuffer> compressed_writer;
if (compress_snapshots_zstd)
compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3);
else
compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer);
2022-07-23 14:27:44 +00:00
KeeperStorageSnapshot::serialize(snapshot, *compressed_writer, keeper_context);
compressed_writer->finalize();
return buffer_raw_ptr->getBuffer();
}
2021-03-01 13:33:34 +00:00
2021-09-27 14:21:10 +00:00
bool KeeperSnapshotManager::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer)
{
static constexpr unsigned char ZSTD_COMPRESSED_MAGIC[4] = {0x28, 0xB5, 0x2F, 0xFD};
ReadBufferFromNuraftBuffer reader(buffer);
unsigned char magic_from_buffer[4]{};
reader.readStrict(reinterpret_cast<char *>(&magic_from_buffer), sizeof(magic_from_buffer));
buffer->pos(0);
return memcmp(magic_from_buffer, ZSTD_COMPRESSED_MAGIC, 4) == 0;
2021-03-01 13:33:34 +00:00
}
2021-10-18 15:27:51 +00:00
SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
2021-03-01 13:33:34 +00:00
{
bool is_zstd_compressed = isZstdCompressed(buffer);
std::unique_ptr<ReadBufferFromNuraftBuffer> reader = std::make_unique<ReadBufferFromNuraftBuffer>(buffer);
std::unique_ptr<ReadBuffer> compressed_reader;
if (is_zstd_compressed)
compressed_reader = wrapReadBufferWithCompressionMethod(std::move(reader), CompressionMethod::Zstd);
else
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);
2021-10-18 15:27:51 +00:00
SnapshotDeserializationResult result;
2022-07-27 07:51:30 +00:00
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest, keeper_context, /* initialize_system_nodes */ false);
2022-07-23 14:27:44 +00:00
KeeperStorageSnapshot::deserialize(result, *compressed_reader, keeper_context);
2022-07-21 09:31:06 +00:00
result.storage->initializeSystemNodes();
2021-10-18 15:27:51 +00:00
return result;
2021-03-01 13:33:34 +00:00
}
2021-10-18 15:27:51 +00:00
SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot()
2021-03-01 13:33:34 +00:00
{
if (existing_snapshots.empty())
2021-03-19 08:08:43 +00:00
return {};
2021-03-01 13:33:34 +00:00
auto buffer = deserializeLatestSnapshotBufferFromDisk();
if (!buffer)
2021-03-19 08:08:43 +00:00
return {};
return deserializeSnapshotFromBuffer(buffer);
2021-03-01 13:33:34 +00:00
}
2023-05-22 12:24:16 +00:00
DiskPtr KeeperSnapshotManager::getDisk() const
{
2023-05-24 09:04:12 +00:00
return keeper_context->getSnapshotDisk();
2023-05-22 12:24:16 +00:00
}
2023-05-30 13:22:40 +00:00
DiskPtr KeeperSnapshotManager::getLatestSnapshotDisk() const
{
return keeper_context->getLatestSnapshotDisk();
}
2021-03-29 08:24:56 +00:00
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
2021-03-01 14:54:08 +00:00
{
while (existing_snapshots.size() > snapshots_to_keep)
removeSnapshot(existing_snapshots.begin()->first);
}
2023-05-30 13:22:40 +00:00
void KeeperSnapshotManager::moveSnapshotsIfNeeded()
{
/// move snapshots to correct disks
auto disk = getDisk();
auto latest_snapshot_disk = getLatestSnapshotDisk();
auto latest_snapshot_idx = getLatestSnapshotIndex();
for (auto & [idx, file_info] : existing_snapshots)
{
if (idx == latest_snapshot_idx)
{
if (file_info.disk != latest_snapshot_disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path);
file_info.disk = latest_snapshot_disk;
}
}
else
{
if (file_info.disk != disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, disk, file_info.path);
file_info.disk = disk;
}
}
}
}
2021-04-08 14:17:57 +00:00
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
2023-05-25 13:31:11 +00:00
const auto & [path, disk] = itr->second;
disk->removeFile(path);
existing_snapshots.erase(itr);
2021-03-01 14:54:08 +00:00
}
2021-03-01 13:33:34 +00:00
2023-05-25 08:43:11 +00:00
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
2022-02-14 11:43:08 +00:00
{
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
2023-05-30 13:22:40 +00:00
auto disk = getLatestSnapshotDisk();
2023-05-24 07:36:39 +00:00
{
disk->writeFile(tmp_snapshot_file_name);
}
auto writer = disk->writeFile(snapshot_file_name);
2022-02-14 11:43:08 +00:00
std::unique_ptr<WriteBuffer> compressed_writer;
if (compress_snapshots_zstd)
compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3);
else
compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer);
2022-07-23 14:27:44 +00:00
KeeperStorageSnapshot::serialize(snapshot, *compressed_writer, keeper_context);
2022-02-14 11:43:08 +00:00
compressed_writer->finalize();
compressed_writer->sync();
2023-05-25 08:43:11 +00:00
disk->removeFile(tmp_snapshot_file_name);
2023-05-25 16:01:40 +00:00
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
2023-05-30 13:22:40 +00:00
moveSnapshotsIfNeeded();
2023-05-25 08:43:11 +00:00
return {snapshot_file_name, disk};
2022-02-14 11:43:08 +00:00
}
2021-03-01 13:33:34 +00:00
2021-01-21 11:07:55 +00:00
}