use IDisk to do IO in Keeper's snapshots and logs

This commit is contained in:
LiuYangkuan 2023-02-01 18:50:43 +08:00
parent 099532ecc5
commit f29700bd2f
14 changed files with 244 additions and 159 deletions

View File

@ -9,6 +9,7 @@
#include <Poco/AutoPtr.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Disks/DiskLocal.h>
int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
@ -51,7 +52,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta);
DB::KeeperSnapshotManager manager(options["output-dir"].as<std::string>(), 1, keeper_context);
DB::KeeperSnapshotManager manager(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>(), 0), 1, keeper_context);
auto snp = manager.serializeSnapshotToBuffer(snapshot);
auto path = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
std::cout << "Snapshot serialized to path:" << path << std::endl;

View File

@ -31,12 +31,9 @@ namespace
constexpr auto DEFAULT_PREFIX = "changelog";
std::string formatChangelogPath(
const std::string & prefix, const std::string & name_prefix, uint64_t from_index, uint64_t to_index, const std::string & extension)
inline std::string formatChangelogPath(const std::string & name_prefix, uint64_t from_index, uint64_t to_index, const std::string & extension)
{
std::filesystem::path path(prefix);
path /= std::filesystem::path(fmt::format("{}_{}_{}.{}", name_prefix, from_index, to_index, extension));
return path;
return fmt::format("{}_{}_{}.{}", name_prefix, from_index, to_index, extension);
}
ChangelogFileDescriptionPtr getChangelogFileDescription(const std::filesystem::path & path)
@ -88,11 +85,11 @@ class ChangelogWriter
public:
ChangelogWriter(
std::map<uint64_t, ChangelogFileDescriptionPtr> & existing_changelogs_,
const std::filesystem::path & changelogs_dir_,
DiskPtr disk_,
LogFileSettings log_file_settings_)
: existing_changelogs(existing_changelogs_)
, log_file_settings(log_file_settings_)
, changelogs_dir(changelogs_dir_)
, disk(disk_)
, log(&Poco::Logger::get("Changelog"))
{
}
@ -109,7 +106,7 @@ public:
file_description->expectedEntriesCountInLog());
// we have a file we need to finalize first
if (tryGetFileBuffer() && prealloc_done)
if (tryGetFileBaseBuffer() && prealloc_done)
{
finalizeCurrentFile();
@ -121,18 +118,16 @@ public:
&& *last_index_written != current_file_description->to_log_index)
{
auto new_path = formatChangelogPath(
changelogs_dir,
current_file_description->prefix,
current_file_description->from_log_index,
*last_index_written,
current_file_description->extension);
std::filesystem::rename(current_file_description->path, new_path);
disk->moveFile(current_file_description->path, new_path);
current_file_description->path = std::move(new_path);
}
}
file_buf = std::make_unique<WriteBufferFromFile>(
file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY));
file_buf = disk->writeFile(file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode);
last_index_written.reset();
current_file_description = std::move(file_description);
@ -148,12 +143,15 @@ public:
}
}
bool isFileSet() const { return tryGetFileBuffer() != nullptr; }
/// There is bug when compressed_buffer has value, file_buf's ownership transfer to compressed_buffer
bool isFileSet() const
{
return compressed_buffer.get() != nullptr || file_buf.get() != nullptr;
}
bool appendRecord(ChangelogRecord && record)
{
const auto * file_buffer = tryGetFileBuffer();
const auto * file_buffer = tryGetFileBaseBuffer();
assert(file_buffer && current_file_description);
assert(record.header.index - getStartIndex() <= current_file_description->expectedEntriesCountInLog());
@ -207,7 +205,7 @@ public:
void flush()
{
auto * file_buffer = tryGetFileBuffer();
auto * file_buffer = tryGetFileBaseBuffer();
/// Fsync file system if needed
if (file_buffer && log_file_settings.force_sync)
file_buffer->sync();
@ -232,7 +230,6 @@ public:
new_description->extension += "." + toContentEncodingName(CompressionMethod::Zstd);
new_description->path = formatChangelogPath(
changelogs_dir,
new_description->prefix,
new_start_log_index,
new_start_log_index + log_file_settings.rotate_interval - 1,
@ -254,14 +251,13 @@ private:
void finalizeCurrentFile()
{
const auto * file_buffer = tryGetFileBuffer();
assert(file_buffer && prealloc_done);
assert(prealloc_done);
assert(current_file_description);
// compact can delete the file and we don't need to do anything
if (current_file_description->deleted)
{
LOG_WARNING(log, "Log {} is already deleted", file_buffer->getFileName());
LOG_WARNING(log, "Log {} is already deleted", current_file_description->path);
return;
}
@ -270,7 +266,8 @@ private:
flush();
if (log_file_settings.max_size != 0)
const auto * file_buffer = tryGetFileBuffer();
if (log_file_settings.max_size != 0 && file_buffer)
ftruncate(file_buffer->getFD(), initial_file_size + file_buffer->count());
if (log_file_settings.compress_logs)
@ -281,6 +278,8 @@ private:
WriteBuffer & getBuffer()
{
/// TODO: unify compressed_buffer and file_buf,
/// compressed_buffer can use its NestedBuffer directly if compress_logs=false
if (compressed_buffer)
return *compressed_buffer;
@ -310,10 +309,15 @@ private:
if (compressed_buffer)
return dynamic_cast<WriteBufferFromFile *>(compressed_buffer->getNestedBuffer());
if (file_buf)
return file_buf.get();
return dynamic_cast<WriteBufferFromFile *>(file_buf.get());
}
return nullptr;
WriteBufferFromFileBase * tryGetFileBaseBuffer()
{
if (compressed_buffer)
return dynamic_cast<WriteBufferFromFileBase *>(compressed_buffer->getNestedBuffer());
return file_buf.get();
}
void tryPreallocateForFile()
@ -325,13 +329,22 @@ private:
return;
}
const auto & file_buffer = getFileBuffer();
const auto * file_buffer = tryGetFileBuffer();
if (!file_buffer)
{
initial_file_size = 0;
prealloc_done = true;
LOG_WARNING(log, "Could not preallocate space on disk {} using fallocate", disk->getName());
return;
}
#ifdef OS_LINUX
{
int res = -1;
do
{
res = fallocate(file_buffer.getFD(), FALLOC_FL_KEEP_SIZE, 0, log_file_settings.max_size + log_file_settings.overallocate_size);
res = fallocate(file_buffer->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_file_settings.max_size + log_file_settings.overallocate_size);
} while (res < 0 && errno == EINTR);
if (res != 0)
@ -346,7 +359,7 @@ private:
}
}
#endif
initial_file_size = getSizeFromFileDescriptor(file_buffer.getFD());
initial_file_size = getSizeFromFileDescriptor(file_buffer->getFD());
prealloc_done = true;
}
@ -354,7 +367,7 @@ private:
std::map<uint64_t, ChangelogFileDescriptionPtr> & existing_changelogs;
ChangelogFileDescriptionPtr current_file_description{nullptr};
std::unique_ptr<WriteBufferFromFile> file_buf;
std::unique_ptr<WriteBufferFromFileBase> file_buf;
std::optional<uint64_t> last_index_written;
size_t initial_file_size{0};
@ -364,7 +377,7 @@ private:
LogFileSettings log_file_settings;
const std::filesystem::path changelogs_dir;
DiskPtr disk;
Poco::Logger * const log;
};
@ -394,10 +407,12 @@ struct ChangelogReadResult
class ChangelogReader
{
public:
explicit ChangelogReader(const std::string & filepath_) : filepath(filepath_)
explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_)
: disk(disk_)
, filepath(filepath_)
{
auto compression_method = chooseCompressionMethod(filepath, "");
auto read_buffer_from_file = std::make_unique<ReadBufferFromFile>(filepath);
auto read_buffer_from_file = disk->readFile(filepath);
read_buf = wrapReadBufferWithCompressionMethod(std::move(read_buffer_from_file), compression_method);
}
@ -493,37 +508,35 @@ public:
}
private:
DiskPtr disk;
std::string filepath;
std::unique_ptr<ReadBuffer> read_buf;
};
Changelog::Changelog(
const std::string & changelogs_dir_,
DiskPtr disk_,
Poco::Logger * log_,
LogFileSettings log_file_settings)
: changelogs_dir(changelogs_dir_)
, changelogs_detached_dir(changelogs_dir / "detached")
: disk(disk_)
, changelogs_detached_dir("detached")
, rotate_interval(log_file_settings.rotate_interval)
, log(log_)
, write_operations(std::numeric_limits<size_t>::max())
, append_completion_queue(std::numeric_limits<size_t>::max())
{
/// Load all files in changelog directory
namespace fs = std::filesystem;
if (!fs::exists(changelogs_dir))
fs::create_directories(changelogs_dir);
for (const auto & p : fs::directory_iterator(changelogs_dir))
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
if (p == changelogs_detached_dir)
if (it->name() == changelogs_detached_dir)
continue;
auto file_description = getChangelogFileDescription(p.path());
auto file_description = getChangelogFileDescription(it->path());
existing_changelogs[file_description->from_log_index] = std::move(file_description);
}
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir.generic_string());
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", disk->getPath());
clean_log_thread = ThreadFromGlobalPool([this] { cleanLogThread(); });
@ -532,7 +545,7 @@ Changelog::Changelog(
append_completion_thread = ThreadFromGlobalPool([this] { appendCompletionThread(); });
current_writer = std::make_unique<ChangelogWriter>(
existing_changelogs, changelogs_dir, log_file_settings);
existing_changelogs, disk, log_file_settings);
}
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
@ -604,7 +617,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
break;
}
ChangelogReader reader(changelog_description.path);
ChangelogReader reader(disk, changelog_description.path);
last_log_read_result = reader.readChangelog(logs, start_to_read_from, log);
last_log_read_result->log_start_index = changelog_description.from_log_index;
@ -671,7 +684,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (last_log_read_result->last_read_index == 0 || last_log_read_result->error) /// If it's broken log then remove it
{
LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description->path);
std::filesystem::remove(description->path);
disk->removeFile(description->path);
existing_changelogs.erase(last_log_read_result->log_start_index);
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; });
}
@ -691,6 +704,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
void Changelog::initWriter(ChangelogFileDescriptionPtr description)
{
if (description->expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description->expectedEntriesCountInLog());
LOG_TRACE(log, "Continue to write into {}", description->path);
current_writer->setFile(std::move(description), WriteMode::Append);
}
@ -715,20 +731,20 @@ std::string getCurrentTimestampFolder()
void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
{
const auto timestamp_folder = changelogs_detached_dir / getCurrentTimestampFolder();
const auto timestamp_folder = (fs::path(changelogs_detached_dir) / getCurrentTimestampFolder()).generic_string();
for (auto itr = begin; itr != end;)
{
if (!std::filesystem::exists(timestamp_folder))
if (!disk->exists(timestamp_folder))
{
LOG_WARNING(log, "Moving broken logs to {}", timestamp_folder.generic_string());
std::filesystem::create_directories(timestamp_folder);
LOG_WARNING(log, "Moving broken logs to {}", timestamp_folder);
disk->createDirectories(timestamp_folder);
}
LOG_WARNING(log, "Removing changelog {}", itr->second->path);
const std::filesystem::path & path = itr->second->path;
const auto new_path = timestamp_folder / path.filename();
std::filesystem::rename(path, new_path);
disk->moveFile(path.generic_string(), new_path.generic_string());
itr = existing_changelogs.erase(itr);
}
}
@ -885,7 +901,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
auto to_remove_itr = existing_changelogs.upper_bound(index);
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
{
std::filesystem::remove(itr->second->path);
disk->removeFile(itr->second->path);
itr = existing_changelogs.erase(itr);
}
}
@ -937,12 +953,19 @@ void Changelog::compact(uint64_t up_to_log_index)
/// If failed to push to queue for background removing, then we will remove it now
if (!log_files_to_delete_queue.tryPush(changelog_description.path, 1))
{
std::error_code ec;
std::filesystem::remove(changelog_description.path, ec);
if (ec)
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", changelog_description.path, ec.message());
else
LOG_INFO(log, "Removed changelog {} because of compaction", changelog_description.path);
try
{
disk->removeFile(itr->second->path);
LOG_INFO(log, "Removed changelog {} because of compaction.", itr->second->path);
}
catch (Exception & e)
{
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", itr->second->path, e.message());
}
catch (...)
{
tryLogCurrentException(log);
}
}
changelog_description.deleted = true;
@ -1135,11 +1158,19 @@ void Changelog::cleanLogThread()
std::string path;
while (log_files_to_delete_queue.pop(path))
{
std::error_code ec;
if (std::filesystem::remove(path, ec))
try
{
disk->removeFile(path);
LOG_INFO(log, "Removed changelog {} because of compaction.", path);
else
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", path, ec.message());
}
catch (Exception & e)
{
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", path, e.message());
}
catch (...)
{
tryLogCurrentException(log);
}
}
}

View File

@ -86,7 +86,7 @@ class Changelog
{
public:
Changelog(
const std::string & changelogs_dir_,
DiskPtr disk_,
Poco::Logger * log_,
LogFileSettings log_file_settings);
@ -168,8 +168,8 @@ private:
/// Clean useless log files in a background thread
void cleanLogThread();
const std::filesystem::path changelogs_dir;
const std::filesystem::path changelogs_detached_dir;
DiskPtr disk;
const String changelogs_detached_dir;
const uint64_t rotate_interval;
Poco::Logger * log;

View File

@ -1,13 +1,13 @@
#include <Coordination/KeeperLogStore.h>
#include <IO/CompressionMethod.h>
#include <Disks/DiskLocal.h>
namespace DB
{
KeeperLogStore::KeeperLogStore(
const std::string & changelogs_path, LogFileSettings log_file_settings)
KeeperLogStore::KeeperLogStore(DiskPtr disk_, LogFileSettings log_file_settings)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, log, log_file_settings)
, changelog(disk_, log, log_file_settings)
{
if (log_file_settings.force_sync)
LOG_INFO(log, "force_sync enabled");
@ -15,6 +15,11 @@ KeeperLogStore::KeeperLogStore(
LOG_INFO(log, "force_sync disabled");
}
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, LogFileSettings log_file_settings)
: KeeperLogStore(std::make_shared<DiskLocal>("Keeper-logs", changelogs_path, 0), log_file_settings)
{
}
uint64_t KeeperLogStore::start_index() const
{
std::lock_guard lock(changelog_lock);

View File

@ -14,6 +14,9 @@ namespace DB
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(DiskPtr disk_, LogFileSettings log_file_settings);
/// For gtest
KeeperLogStore(const std::string & changelogs_path, LogFileSettings log_file_settings);
/// Read log storage from filesystem starting from last_commited_log_index

View File

@ -25,6 +25,7 @@
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
namespace DB
{
@ -121,20 +122,36 @@ KeeperServer::KeeperServer(
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
keeper_context->ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);
if (!fs::exists(configuration_and_settings_->snapshot_storage_path))
fs::create_directories(configuration_and_settings_->snapshot_storage_path);
auto snapshots_disk = std::make_shared<DiskLocal>("Keeper-snapshots", configuration_and_settings_->snapshot_storage_path, 0);
state_machine = nuraft::cs_new<KeeperStateMachine>(
responses_queue_,
snapshots_queue_,
configuration_and_settings_->snapshot_storage_path,
snapshots_disk,
coordination_settings,
keeper_context,
config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr,
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
auto state_path = fs::path(configuration_and_settings_->state_file_path).parent_path().generic_string();
auto state_file_name = fs::path(configuration_and_settings_->state_file_path).filename().generic_string();
if (!fs::exists(state_path))
fs::create_directories(state_path);
auto state_disk = std::make_shared<DiskLocal>("Keeper-state", state_path, 0);
if (!fs::exists(configuration_and_settings_->log_storage_path))
fs::create_directories(configuration_and_settings_->log_storage_path);
auto logs_disk = std::make_shared<DiskLocal>("Keeper-logs", configuration_and_settings_->log_storage_path, 0);
state_manager = nuraft::cs_new<KeeperStateManager>(
server_id,
"keeper_server",
configuration_and_settings_->log_storage_path,
configuration_and_settings_->state_file_path,
logs_disk,
state_disk,
state_file_name,
config,
coordination_settings);
}

View File

@ -16,6 +16,7 @@
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Disks/DiskLocal.h>
namespace DB
@ -507,39 +508,45 @@ KeeperSnapshotManager::KeeperSnapshotManager(
bool compress_snapshots_zstd_,
const std::string & superdigest_,
size_t storage_tick_time_)
: snapshots_path(snapshots_path_)
: KeeperSnapshotManager(
std::make_shared<DiskLocal>("Keeper-snapshots", snapshots_path_, 0),
snapshots_to_keep_,
keeper_context_,
compress_snapshots_zstd_,
superdigest_,
storage_tick_time_)
{
}
KeeperSnapshotManager::KeeperSnapshotManager(
DiskPtr disk_,
size_t snapshots_to_keep_,
const KeeperContextPtr & keeper_context_,
bool compress_snapshots_zstd_,
const std::string & superdigest_,
size_t storage_tick_time_)
: disk(disk_)
, snapshots_to_keep(snapshots_to_keep_)
, compress_snapshots_zstd(compress_snapshots_zstd_)
, superdigest(superdigest_)
, storage_tick_time(storage_tick_time_)
, keeper_context(keeper_context_)
{
namespace fs = std::filesystem;
if (!fs::exists(snapshots_path))
fs::create_directories(snapshots_path);
for (const auto & p : fs::directory_iterator(snapshots_path))
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
const auto & path = p.path();
if (!path.has_filename())
const auto & name = it->name();
if (name.empty())
continue;
if (startsWith(path.filename(), "tmp_")) /// Unfinished tmp files
if (startsWith(name, "tmp_"))
{
std::filesystem::remove(p);
disk->removeFile(it->path());
continue;
}
/// Not snapshot file
if (!startsWith(path.filename(), "snapshot_"))
{
if (!startsWith(name, "snapshot_"))
continue;
}
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path());
existing_snapshots[snapshot_up_to] = p.path();
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name);
existing_snapshots[snapshot_up_to] = it->path();
}
removeOutdatedSnapshotsIfNeeded();
@ -552,19 +559,17 @@ std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
std::string tmp_snapshot_path = std::filesystem::path{snapshots_path} / tmp_snapshot_file_name;
std::string new_snapshot_path = std::filesystem::path{snapshots_path} / snapshot_file_name;
WriteBufferFromFile plain_buf(tmp_snapshot_path);
copyData(reader, plain_buf);
plain_buf.sync();
auto plain_buf = disk->writeFile(tmp_snapshot_file_name);
copyData(reader, *plain_buf);
plain_buf->sync();
std::filesystem::rename(tmp_snapshot_path, new_snapshot_path);
disk->moveFile(tmp_snapshot_file_name, snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, new_snapshot_path);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_name);
removeOutdatedSnapshotsIfNeeded();
return new_snapshot_path;
return snapshot_file_name;
}
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
@ -578,7 +583,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
}
catch (const DB::Exception &)
{
std::filesystem::remove(latest_itr->second);
disk->removeFile(latest_itr->second);
existing_snapshots.erase(latest_itr->first);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
@ -591,8 +596,8 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFrom
{
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer;
ReadBufferFromFile reader(snapshot_path);
copyData(reader, writer);
auto reader = disk->readFile(snapshot_path);
copyData(*reader, writer);
return writer.getBuffer();
}
@ -664,7 +669,7 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
std::filesystem::remove(itr->second);
disk->removeFile(itr->second);
existing_snapshots.erase(itr);
}
@ -673,10 +678,8 @@ std::pair<std::string, std::error_code> KeeperSnapshotManager::serializeSnapshot
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
std::string tmp_snapshot_path = std::filesystem::path{snapshots_path} / tmp_snapshot_file_name;
std::string new_snapshot_path = std::filesystem::path{snapshots_path} / snapshot_file_name;
auto writer = std::make_unique<WriteBufferFromFile>(tmp_snapshot_path, O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC | O_APPEND);
auto writer = disk->writeFile(tmp_snapshot_file_name);
std::unique_ptr<WriteBuffer> compressed_writer;
if (compress_snapshots_zstd)
compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3);
@ -688,13 +691,21 @@ std::pair<std::string, std::error_code> KeeperSnapshotManager::serializeSnapshot
compressed_writer->sync();
std::error_code ec;
std::filesystem::rename(tmp_snapshot_path, new_snapshot_path, ec);
if (!ec)
try
{
existing_snapshots.emplace(up_to_log_idx, new_snapshot_path);
removeOutdatedSnapshotsIfNeeded();
disk->moveFile(tmp_snapshot_file_name, snapshot_file_name);
}
return {new_snapshot_path, ec};
catch (fs::filesystem_error & e)
{
ec = e.code();
return {snapshot_file_name, ec};
}
existing_snapshots.emplace(up_to_log_idx, snapshot_file_name);
removeOutdatedSnapshotsIfNeeded();
return {snapshot_file_name, ec};
}
}

View File

@ -6,6 +6,7 @@
#include <IO/WriteBuffer.h>
#include <libnuraft/nuraft.hxx>
#include <Coordination/KeeperContext.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -97,6 +98,15 @@ using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
class KeeperSnapshotManager
{
public:
KeeperSnapshotManager(
DiskPtr disk_,
size_t snapshots_to_keep_,
const KeeperContextPtr & keeper_context_,
bool compress_snapshots_zstd_ = true,
const std::string & superdigest_ = "",
size_t storage_tick_time_ = 500);
/// For gtest
KeeperSnapshotManager(
const std::string & snapshots_path_,
size_t snapshots_to_keep_,
@ -144,9 +154,15 @@ public:
if (!existing_snapshots.empty())
{
const auto & path = existing_snapshots.at(getLatestSnapshotIndex());
std::error_code ec;
if (std::filesystem::exists(path, ec))
return path;
try
{
if (disk->exists(path))
return path;
}
catch (...)
{
}
}
return "";
}
@ -158,7 +174,7 @@ private:
/// ZSTD codec.
static bool isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer);
const std::string snapshots_path;
DiskPtr disk;
/// How many snapshots to keep before remove
const size_t snapshots_to_keep;
/// All existing snapshots in our path (log_index -> path)

View File

@ -41,14 +41,14 @@ namespace
KeeperStateMachine::KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
DiskPtr disk_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
const std::string & superdigest_)
: coordination_settings(coordination_settings_)
, snapshot_manager(
snapshots_path_,
disk_,
coordination_settings->snapshots_to_keep,
keeper_context_,
coordination_settings->compress_snapshots_with_zstd_format,

View File

@ -25,7 +25,7 @@ public:
KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
DiskPtr disk_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,

View File

@ -7,6 +7,7 @@
#include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
namespace DB
{
@ -214,8 +215,8 @@ KeeperStateManager::KeeperStateManager(
int server_id_, const std::string & host, int port, const std::string & logs_path, const std::string & state_file_path)
: my_server_id(server_id_)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, LogFileSettings{.force_sync =false, .compress_logs = false, .rotate_interval = 5000}))
, server_state_path(state_file_path)
, log_store(nuraft::cs_new<KeeperLogStore>(std::make_shared<DiskLocal>("Keeper-logs", logs_path, 0), LogFileSettings{.force_sync =false, .compress_logs = false, .rotate_interval = 5000}))
, server_state_file_name(fs::path(state_file_path).filename().generic_string())
, logger(&Poco::Logger::get("KeeperStateManager"))
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
@ -228,8 +229,9 @@ KeeperStateManager::KeeperStateManager(
KeeperStateManager::KeeperStateManager(
int my_server_id_,
const std::string & config_prefix_,
const std::string & log_storage_path,
const std::string & state_file_path,
DiskPtr log_disk_,
DiskPtr state_disk_,
const std::string & server_state_file_name_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings)
: my_server_id(my_server_id_)
@ -237,7 +239,7 @@ KeeperStateManager::KeeperStateManager(
, config_prefix(config_prefix_)
, configuration_wrapper(parseServersConfiguration(config, false))
, log_store(nuraft::cs_new<KeeperLogStore>(
log_storage_path,
log_disk_,
LogFileSettings
{
.force_sync = coordination_settings->force_sync,
@ -246,7 +248,8 @@ KeeperStateManager::KeeperStateManager(
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size
}))
, server_state_path(state_file_path)
, disk(state_disk_)
, server_state_file_name(server_state_file_name_)
, logger(&Poco::Logger::get("KeeperStateManager"))
{
}
@ -285,11 +288,11 @@ void KeeperStateManager::save_config(const nuraft::cluster_config & config)
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);
}
const std::filesystem::path & KeeperStateManager::getOldServerStatePath()
const String & KeeperStateManager::getOldServerStatePath()
{
static auto old_path = [this]
{
return server_state_path.parent_path() / (server_state_path.filename().generic_string() + "-OLD");
return server_state_file_name + "-OLD";
}();
return old_path;
@ -310,25 +313,24 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state)
{
const auto & old_path = getOldServerStatePath();
if (std::filesystem::exists(server_state_path))
std::filesystem::rename(server_state_path, old_path);
if (disk->exists(server_state_file_name))
disk->moveFile(server_state_file_name, old_path);
WriteBufferFromFile server_state_file(server_state_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
auto server_state_file = disk->writeFile(server_state_file_name);
auto buf = state.serialize();
// calculate checksum
SipHash hash;
hash.update(current_server_state_version);
hash.update(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
writeIntBinary(hash.get64(), server_state_file);
writeIntBinary(hash.get64(), *server_state_file);
writeIntBinary(static_cast<uint8_t>(current_server_state_version), server_state_file);
writeIntBinary(static_cast<uint8_t>(current_server_state_version), *server_state_file);
server_state_file.write(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
server_state_file.sync();
server_state_file.close();
server_state_file->write(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
server_state_file->sync();
std::filesystem::remove(old_path);
disk->removeFileIfExists(old_path);
}
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
@ -339,22 +341,22 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
{
try
{
ReadBufferFromFile read_buf(path);
auto content_size = read_buf.getFileSize();
auto read_buf = disk->readFile(path);
auto content_size = read_buf->getFileSize();
if (content_size == 0)
return nullptr;
uint64_t read_checksum{0};
readIntBinary(read_checksum, read_buf);
readIntBinary(read_checksum, *read_buf);
uint8_t version;
readIntBinary(version, read_buf);
readIntBinary(version, *read_buf);
auto buffer_size = content_size - sizeof read_checksum - sizeof version;
auto state_buf = nuraft::buffer::alloc(buffer_size);
read_buf.readStrict(reinterpret_cast<char *>(state_buf->data_begin()), buffer_size);
read_buf->readStrict(reinterpret_cast<char *>(state_buf->data_begin()), buffer_size);
SipHash hash;
hash.update(version);
@ -364,15 +366,15 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
{
constexpr auto error_format = "Invalid checksum while reading state from {}. Got {}, expected {}";
#ifdef NDEBUG
LOG_ERROR(logger, error_format, path.generic_string(), hash.get64(), read_checksum);
LOG_ERROR(logger, error_format, path, hash.get64(), read_checksum);
return nullptr;
#else
throw Exception(ErrorCodes::CORRUPTED_DATA, error_format, path.generic_string(), hash.get64(), read_checksum);
throw Exception(ErrorCodes::CORRUPTED_DATA, error_format, disk->getPath() + path, hash.get64(), read_checksum);
#endif
}
auto state = nuraft::srv_state::deserialize(*state_buf);
LOG_INFO(logger, "Read state from {}", path.generic_string());
LOG_INFO(logger, "Read state from {}", disk->getPath() + path);
return state;
}
catch (const std::exception & e)
@ -383,37 +385,34 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
throw;
}
LOG_ERROR(logger, "Failed to deserialize state from {}", path.generic_string());
LOG_ERROR(logger, "Failed to deserialize state from {}", disk->getPath() + path);
return nullptr;
}
};
if (std::filesystem::exists(server_state_path))
if (disk->exists(server_state_file_name))
{
auto state = try_read_file(server_state_path);
auto state = try_read_file(server_state_file_name);
if (state)
{
if (std::filesystem::exists(old_path))
std::filesystem::remove(old_path);
disk->removeFileIfExists(old_path);
return state;
}
std::filesystem::remove(server_state_path);
disk->removeFile(server_state_file_name);
}
if (std::filesystem::exists(old_path))
if (disk->exists(old_path))
{
auto state = try_read_file(old_path);
if (state)
{
std::filesystem::rename(old_path, server_state_path);
disk->moveFile(old_path, server_state_file_name);
return state;
}
std::filesystem::remove(old_path);
disk->removeFile(old_path);
}
LOG_WARNING(logger, "No state was read");

View File

@ -39,7 +39,8 @@ public:
KeeperStateManager(
int server_id_,
const std::string & config_prefix_,
const std::string & log_storage_path,
DiskPtr logs_disk_,
DiskPtr state_disk_,
const std::string & state_file_path,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings);
@ -111,7 +112,7 @@ public:
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
private:
const std::filesystem::path & getOldServerStatePath();
const String & getOldServerStatePath();
/// Wrapper struct for Keeper cluster config. We parse this
/// info from XML files.
@ -136,7 +137,8 @@ private:
nuraft::ptr<KeeperLogStore> log_store;
const std::filesystem::path server_state_path;
DiskPtr disk;
const String server_state_file_name;
Poco::Logger * logger;

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
}
ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBufferFromFile> out_,
std::unique_ptr<WriteBufferFromFileBase> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size,

View File

@ -29,7 +29,7 @@ public:
static inline constexpr ZSTDLastBlock ZSTD_CORRECT_TERMINATION_LAST_BLOCK = {0x01, 0x00, 0x00};
ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBufferFromFile> out_,
std::unique_ptr<WriteBufferFromFileBase> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -68,7 +68,7 @@ private:
/// Adding zstd empty block (ZSTD_CORRECT_TERMINATION_LAST_BLOCK) to out.working_buffer
void addEmptyBlock();
std::unique_ptr<WriteBufferFromFile> out;
std::unique_ptr<WriteBufferFromFileBase> out;
bool append_to_existing_file = false;
ZSTD_CCtx * cctx;