ClickHouse/src/Backups/BackupCoordinationDistributed.cpp

260 lines
9.9 KiB
C++
Raw Normal View History

2022-04-19 18:15:27 +00:00
#include <Backups/BackupCoordinationDistributed.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/escapeForFileName.h>
#include <Common/hex.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
}
/// zookeeper_path/file_names/file_name->checksum_and_size
/// zookeeper_path/file_infos/checksum_and_size->info
/// zookeeper_path/archive_suffixes
/// zookeeper_path/current_archive_suffix
namespace
{
using SizeAndChecksum = IBackupCoordination::SizeAndChecksum;
using FileInfo = IBackupCoordination::FileInfo;
String serializeFileInfo(const FileInfo & info)
{
WriteBufferFromOwnString out;
writeBinary(info.file_name, out);
writeBinary(info.size, out);
writeBinary(info.checksum, out);
writeBinary(info.base_size, out);
writeBinary(info.base_checksum, out);
writeBinary(info.data_file_name, out);
writeBinary(info.archive_suffix, out);
writeBinary(info.pos_in_archive, out);
return out.str();
}
FileInfo deserializeFileInfo(const String & str)
{
FileInfo info;
ReadBufferFromString in{str};
readBinary(info.file_name, in);
readBinary(info.size, in);
readBinary(info.checksum, in);
readBinary(info.base_size, in);
readBinary(info.base_checksum, in);
readBinary(info.data_file_name, in);
readBinary(info.archive_suffix, in);
readBinary(info.pos_in_archive, in);
return info;
}
String serializeSizeAndChecksum(const SizeAndChecksum & size_and_checksum)
{
return getHexUIntLowercase(size_and_checksum.second) + '_' + std::to_string(size_and_checksum.first);
}
SizeAndChecksum deserializeSizeAndChecksum(const String & str)
{
constexpr size_t num_chars_in_checksum = sizeof(UInt128) * 2;
if (str.size() <= num_chars_in_checksum)
throw Exception(
ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER,
"Unexpected size of checksum: {}, must be {}",
str.size(),
num_chars_in_checksum);
UInt128 checksum = unhexUInt<UInt128>(str.data());
UInt64 size = parseFromString<UInt64>(str.substr(num_chars_in_checksum + 1));
return std::pair{size, checksum};
}
/// We try to store data to zookeeper several times due to possible version conflicts.
constexpr size_t NUM_ATTEMPTS = 10;
}
2022-04-19 18:15:27 +00:00
BackupCoordinationDistributed::BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
: zookeeper_path(zookeeper_path_), get_zookeeper(get_zookeeper_)
{
createRootNodes();
}
2022-04-19 18:15:27 +00:00
BackupCoordinationDistributed::~BackupCoordinationDistributed() = default;
2022-04-19 18:15:27 +00:00
void BackupCoordinationDistributed::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/file_names", "");
zookeeper->createIfNotExists(zookeeper_path + "/file_infos", "");
zookeeper->createIfNotExists(zookeeper_path + "/archive_suffixes", "");
zookeeper->createIfNotExists(zookeeper_path + "/current_archive_suffix", "0");
}
2022-04-19 18:15:27 +00:00
void BackupCoordinationDistributed::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
2022-04-19 18:15:27 +00:00
void BackupCoordinationDistributed::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
{
auto zookeeper = get_zookeeper();
String full_path = zookeeper_path + "/file_names/" + escapeForFileName(file_info.file_name);
String size_and_checksum = serializeSizeAndChecksum(std::pair{file_info.size, file_info.checksum});
zookeeper->create(full_path, size_and_checksum, zkutil::CreateMode::Persistent);
if (!file_info.size)
{
is_data_file_required = false;
return;
}
full_path = zookeeper_path + "/file_infos/" + size_and_checksum;
auto code = zookeeper->tryCreate(full_path, serializeFileInfo(file_info), zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, full_path);
is_data_file_required = (code == Coordination::Error::ZOK) && (file_info.size > file_info.base_size);
}
2022-04-19 18:15:27 +00:00
void BackupCoordinationDistributed::updateFileInfo(const FileInfo & file_info)
{
if (!file_info.size)
return; /// we don't keep FileInfos for empty files, nothing to update
auto zookeeper = get_zookeeper();
String size_and_checksum = serializeSizeAndChecksum(std::pair{file_info.size, file_info.checksum});
String full_path = zookeeper_path + "/file_infos/" + size_and_checksum;
for (size_t attempt = 0; attempt < NUM_ATTEMPTS; ++attempt)
{
Coordination::Stat stat;
auto new_info = deserializeFileInfo(zookeeper->get(full_path, &stat));
new_info.archive_suffix = file_info.archive_suffix;
auto code = zookeeper->trySet(full_path, serializeFileInfo(new_info), stat.version);
if (code == Coordination::Error::ZOK)
return;
bool is_last_attempt = (attempt == NUM_ATTEMPTS - 1);
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
throw zkutil::KeeperException(code, full_path);
}
}
2022-04-19 18:15:27 +00:00
std::vector<FileInfo> BackupCoordinationDistributed::getAllFileInfos() const
{
auto zookeeper = get_zookeeper();
std::vector<FileInfo> file_infos;
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
for (const String & escaped_name : escaped_names)
{
String size_and_checksum = zookeeper->get(zookeeper_path + "/file_names/" + escaped_name);
UInt64 size = deserializeSizeAndChecksum(size_and_checksum).first;
FileInfo file_info;
if (size) /// we don't keep FileInfos for empty files
file_info = deserializeFileInfo(zookeeper->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info.file_name = unescapeForFileName(escaped_name);
file_infos.emplace_back(std::move(file_info));
}
return file_infos;
}
2022-04-19 18:15:27 +00:00
Strings BackupCoordinationDistributed::listFiles(const String & prefix, const String & terminator) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
Strings elements;
for (const String & escaped_name : escaped_names)
{
String name = unescapeForFileName(escaped_name);
if (!name.starts_with(prefix))
continue;
size_t start_pos = prefix.length();
size_t end_pos = String::npos;
if (!terminator.empty())
end_pos = name.find(terminator, start_pos);
std::string_view new_element = std::string_view{name}.substr(start_pos, end_pos - start_pos);
if (!elements.empty() && (elements.back() == new_element))
continue;
elements.push_back(String{new_element});
}
std::sort(elements.begin(), elements.end());
return elements;
}
2022-04-19 18:15:27 +00:00
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
if (!zookeeper->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
return std::nullopt;
UInt64 size = deserializeSizeAndChecksum(size_and_checksum).first;
FileInfo file_info;
if (size) /// we don't keep FileInfos for empty files
file_info = deserializeFileInfo(zookeeper->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info.file_name = file_name;
return file_info;
}
2022-04-19 18:15:27 +00:00
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const SizeAndChecksum & size_and_checksum) const
{
auto zookeeper = get_zookeeper();
String file_info_str;
if (!zookeeper->tryGet(zookeeper_path + "/file_infos/" + serializeSizeAndChecksum(size_and_checksum), file_info_str))
return std::nullopt;
return deserializeFileInfo(file_info_str);
}
2022-04-19 18:15:27 +00:00
std::optional<SizeAndChecksum> BackupCoordinationDistributed::getFileSizeAndChecksum(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
if (!zookeeper->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
return std::nullopt;
return deserializeSizeAndChecksum(size_and_checksum);
}
2022-04-19 18:15:27 +00:00
String BackupCoordinationDistributed::getNextArchiveSuffix()
{
auto zookeeper = get_zookeeper();
for (size_t attempt = 0; attempt != NUM_ATTEMPTS; ++attempt)
{
Coordination::Stat stat;
String current_suffix_str = zookeeper->get(zookeeper_path + "/current_archive_suffix", &stat);
UInt64 current_suffix = parseFromString<UInt64>(current_suffix_str);
current_suffix_str = fmt::format("{:03}", ++current_suffix); /// Outputs 001, 002, 003, ...
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/current_archive_suffix", current_suffix_str, stat.version));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/archive_suffixes/" + current_suffix_str, "", zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
return current_suffix_str;
bool is_last_attempt = (attempt == NUM_ATTEMPTS - 1);
if ((responses[0]->error != Coordination::Error::ZBADVERSION) || is_last_attempt)
throw zkutil::KeeperMultiException(code, ops, responses);
}
__builtin_unreachable();
}
2022-04-19 18:15:27 +00:00
Strings BackupCoordinationDistributed::getAllArchiveSuffixes() const
{
auto zookeeper = get_zookeeper();
return zookeeper->getChildren(zookeeper_path + "/archive_suffixes");
}
2022-04-19 18:15:27 +00:00
void BackupCoordinationDistributed::drop()
{
removeAllNodes();
}
}