Add support for changelog

This commit is contained in:
Antonio Andelic 2023-05-24 07:36:39 +00:00
parent bde2cf96b1
commit 161afea266
14 changed files with 277 additions and 145 deletions

View File

@ -44,8 +44,6 @@
#include <Disks/registerDisks.h>
#include <filesystem>
int mainEntryClickHouseKeeper(int argc, char ** argv)
{

View File

@ -3,17 +3,17 @@
#include <Disks/DiskLocal.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ZstdDeflatingAppendableWriteBuffer.h>
#include <base/errnoToString.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/filesystemHelpers.h>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromFile.h>
#include <base/errnoToString.h>
namespace DB
@ -29,50 +29,58 @@ namespace ErrorCodes
namespace
{
void moveFileBetweenDisks(DiskPtr disk_from, ChangelogFileDescriptionPtr description, DiskPtr disk_to, const std::string & path_to)
{
disk_from->copyFile(description->path, *disk_to, path_to, {});
disk_from->removeFile(description->path);
description->path = path_to;
description->disk = disk_to;
}
constexpr auto DEFAULT_PREFIX = "changelog";
constexpr auto DEFAULT_PREFIX = "changelog";
inline std::string formatChangelogPath(const std::string & name_prefix, uint64_t from_index, uint64_t to_index, const std::string & extension)
{
return fmt::format("{}_{}_{}.{}", name_prefix, from_index, to_index, extension);
}
inline std::string
formatChangelogPath(const std::string & name_prefix, uint64_t from_index, uint64_t to_index, const std::string & extension)
{
return fmt::format("{}_{}_{}.{}", name_prefix, from_index, to_index, extension);
}
ChangelogFileDescriptionPtr getChangelogFileDescription(const std::filesystem::path & path)
{
// we can have .bin.zstd so we cannot use std::filesystem stem and extension
std::string filename_with_extension = path.filename();
std::string_view filename_with_extension_view = filename_with_extension;
ChangelogFileDescriptionPtr getChangelogFileDescription(const std::filesystem::path & path)
{
// we can have .bin.zstd so we cannot use std::filesystem stem and extension
std::string filename_with_extension = path.filename();
std::string_view filename_with_extension_view = filename_with_extension;
auto first_dot = filename_with_extension.find('.');
if (first_dot == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid changelog file {}", path.generic_string());
auto first_dot = filename_with_extension.find('.');
if (first_dot == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid changelog file {}", path.generic_string());
Strings filename_parts;
boost::split(filename_parts, filename_with_extension_view.substr(0, first_dot), boost::is_any_of("_"));
if (filename_parts.size() < 3)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path.generic_string());
Strings filename_parts;
boost::split(filename_parts, filename_with_extension_view.substr(0, first_dot), boost::is_any_of("_"));
if (filename_parts.size() < 3)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path.generic_string());
auto result = std::make_shared<ChangelogFileDescription>();
result->prefix = filename_parts[0];
result->from_log_index = parse<uint64_t>(filename_parts[1]);
result->to_log_index = parse<uint64_t>(filename_parts[2]);
result->extension = std::string(filename_with_extension.substr(first_dot + 1));
result->path = path.generic_string();
return result;
}
auto result = std::make_shared<ChangelogFileDescription>();
result->prefix = filename_parts[0];
result->from_log_index = parse<uint64_t>(filename_parts[1]);
result->to_log_index = parse<uint64_t>(filename_parts[2]);
result->extension = std::string(filename_with_extension.substr(first_dot + 1));
result->path = path.generic_string();
return result;
}
Checksum computeRecordChecksum(const ChangelogRecord & record)
{
SipHash hash;
hash.update(record.header.version);
hash.update(record.header.index);
hash.update(record.header.term);
hash.update(record.header.value_type);
hash.update(record.header.blob_size);
if (record.header.blob_size != 0)
hash.update(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
return hash.get64();
}
Checksum computeRecordChecksum(const ChangelogRecord & record)
{
SipHash hash;
hash.update(record.header.version);
hash.update(record.header.index);
hash.update(record.header.term);
hash.update(record.header.value_type);
hash.update(record.header.blob_size);
if (record.header.blob_size != 0)
hash.update(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
return hash.get64();
}
}
@ -117,25 +125,55 @@ public:
// if we wrote at least 1 log in the log file we can rename the file to reflect correctly the
// contained logs
// file can be deleted from disk earlier by compaction
if (!current_file_description->deleted && last_index_written
&& *last_index_written != current_file_description->to_log_index)
if (!current_file_description->deleted)
{
auto new_path = formatChangelogPath(
current_file_description->prefix,
current_file_description->from_log_index,
*last_index_written,
current_file_description->extension);
disk->moveFile(current_file_description->path, new_path);
current_file_description->path = std::move(new_path);
auto log_disk = current_file_description->disk;
const auto & path = current_file_description->path;
std::string new_path = path;
if (last_index_written && *last_index_written != current_file_description->to_log_index)
{
new_path = formatChangelogPath(
current_file_description->prefix,
current_file_description->from_log_index,
*last_index_written,
current_file_description->extension);
}
if (disk == log_disk)
{
if (path != new_path)
{
try
{
disk->moveFile(path, new_path);
}
catch (...)
{
tryLogCurrentException(log, fmt::format("File rename failed on disk {}", disk->getName()));
}
current_file_description->path = std::move(new_path);
}
}
else
{
moveFileBetweenDisks(log_disk, current_file_description, disk, new_path);
}
}
}
file_buf = disk->writeFile(file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode);
auto current_log_disk = getCurrentLogDisk();
assert(file_description->disk == current_log_disk);
file_buf = current_log_disk->writeFile(file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode);
assert(file_buf);
last_index_written.reset();
current_file_description = std::move(file_description);
if (log_file_settings.compress_logs)
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::move(file_buf), /* compression level = */ 3, /* append_to_existing_file_ = */ mode == WriteMode::Append);
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(
std::move(file_buf),
/* compressi)on level = */ 3,
/* append_to_existing_file_ = */ mode == WriteMode::Append,
[current_log_disk, path = current_file_description->path] { return current_log_disk->readFile(path); });
prealloc_done = false;
}
@ -147,10 +185,7 @@ public:
}
/// There is bug when compressed_buffer has value, file_buf's ownership transfer to compressed_buffer
bool isFileSet() const
{
return compressed_buffer != nullptr || file_buf != nullptr;
}
bool isFileSet() const { return compressed_buffer != nullptr || file_buf != nullptr; }
bool appendRecord(ChangelogRecord && record)
{
@ -236,6 +271,7 @@ public:
new_description->from_log_index = new_start_log_index;
new_description->to_log_index = new_start_log_index + log_file_settings.rotate_interval - 1;
new_description->extension = "bin";
new_description->disk = getCurrentLogDisk();
if (log_file_settings.compress_logs)
new_description->extension += "." + toContentEncodingName(CompressionMethod::Zstd);
@ -259,7 +295,6 @@ public:
}
private:
void finalizeCurrentFile()
{
assert(prealloc_done);
@ -279,14 +314,13 @@ private:
const auto * file_buffer = tryGetFileBuffer();
if (log_file_settings.max_size != 0 && isLocalDisk())
if (log_file_settings.max_size != 0 && file_buffer)
{
int res = -1;
do
{
res = ftruncate(file_buffer->getFD(), initial_file_size + file_buffer->count());
}
while (res < 0 && errno == EINTR);
} while (res < 0 && errno == EINTR);
if (res != 0)
LOG_WARNING(log, "Could not ftruncate file. Error: {}, errno: {}", errnoToString(), errno);
@ -321,10 +355,7 @@ private:
return *file_buffer;
}
const WriteBufferFromFile * tryGetFileBuffer() const
{
return const_cast<ChangelogWriter *>(this)->tryGetFileBuffer();
}
const WriteBufferFromFile * tryGetFileBuffer() const { return const_cast<ChangelogWriter *>(this)->tryGetFileBuffer(); }
WriteBufferFromFile * tryGetFileBuffer()
{
@ -344,30 +375,22 @@ private:
void tryPreallocateForFile()
{
if (log_file_settings.max_size == 0)
{
initial_file_size = 0;
prealloc_done = true;
return;
}
const auto * file_buffer = tryGetFileBuffer();
if (!file_buffer)
if (log_file_settings.max_size == 0 || !file_buffer)
{
initial_file_size = 0;
prealloc_done = true;
LOG_WARNING(log, "Could not preallocate space on disk {} using fallocate", getDisk()->getName());
return;
}
#ifdef OS_LINUX
if (isLocalDisk())
{
int res = -1;
do
{
res = fallocate(file_buffer->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_file_settings.max_size + log_file_settings.overallocate_size);
res = fallocate(
file_buffer->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_file_settings.max_size + log_file_settings.overallocate_size);
} while (res < 0 && errno == EINTR);
if (res != 0)
@ -387,15 +410,11 @@ private:
prealloc_done = true;
}
DiskPtr getDisk() const
{
return keeper_context->getLogDisk();
}
DiskPtr getCurrentLogDisk() const { return keeper_context->getCurrentLogDisk(); }
bool isLocalDisk() const
{
return dynamic_cast<DiskLocal *>(getDisk().get()) != nullptr;
}
DiskPtr getDisk() const { return keeper_context->getLogDisk(); }
bool isLocalDisk() const { return dynamic_cast<DiskLocal *>(getDisk().get()) != nullptr; }
std::map<uint64_t, ChangelogFileDescriptionPtr> & existing_changelogs;
@ -440,9 +459,7 @@ struct ChangelogReadResult
class ChangelogReader
{
public:
explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_)
: disk(disk_)
, filepath(filepath_)
explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_) : disk(disk_), filepath(filepath_)
{
auto compression_method = chooseCompressionMethod(filepath, "");
auto read_buffer_from_file = disk->readFile(filepath);
@ -546,10 +563,7 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
};
Changelog::Changelog(
Poco::Logger * log_,
LogFileSettings log_file_settings,
KeeperContextPtr keeper_context_)
Changelog::Changelog(Poco::Logger * log_, LogFileSettings log_file_settings, KeeperContextPtr keeper_context_)
: changelogs_detached_dir("detached")
, rotate_interval(log_file_settings.rotate_interval)
, log(log_)
@ -557,18 +571,30 @@ Changelog::Changelog(
, append_completion_queue(std::numeric_limits<size_t>::max())
, keeper_context(std::move(keeper_context_))
{
/// Load all files in changelog directory
/// Load all files on changelog disks
const auto load_from_disk = [&](const auto & disk)
{
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
if (it->name() == changelogs_detached_dir)
continue;
auto file_description = getChangelogFileDescription(it->path());
file_description->disk = disk;
auto [changelog_it, inserted] = existing_changelogs.insert_or_assign(file_description->from_log_index, std::move(file_description));
if (!inserted)
LOG_WARNING(log, "Found duplicate entries for {}, will use the entry from {}", changelog_it->second->path, disk->getName());
}
};
auto disk = getDisk();
load_from_disk(disk);
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
if (it->name() == changelogs_detached_dir)
continue;
auto file_description = getChangelogFileDescription(it->path());
existing_changelogs[file_description->from_log_index] = std::move(file_description);
}
auto current_log_disk = getCurrentLogDisk();
load_from_disk(current_log_disk);
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", disk->getPath());
@ -579,8 +605,7 @@ Changelog::Changelog(
append_completion_thread = ThreadFromGlobalPool([this] { appendCompletionThread(); });
current_writer = std::make_unique<ChangelogWriter>(
existing_changelogs, keeper_context, log_file_settings);
current_writer = std::make_unique<ChangelogWriter>(existing_changelogs, keeper_context, log_file_settings);
}
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
@ -652,7 +677,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
break;
}
ChangelogReader reader(getDisk(), changelog_description.path);
ChangelogReader reader(changelog_description.disk, changelog_description.path);
last_log_read_result = reader.readChangelog(logs, start_to_read_from, log);
last_log_read_result->log_start_index = changelog_description.from_log_index;
@ -713,13 +738,13 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
assert(existing_changelogs.find(last_log_read_result->log_start_index) != existing_changelogs.end());
assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
/// Continue to write into incomplete existing log if it doesn't finished with error
/// Continue to write into incomplete existing log if it doesn't finish with error
const auto & description = existing_changelogs[last_log_read_result->log_start_index];
if (last_log_read_result->last_read_index == 0 || last_log_read_result->error) /// If it's broken log then remove it
{
LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description->path);
getDisk()->removeFile(description->path);
description->disk->removeFile(description->path);
existing_changelogs.erase(last_log_read_result->log_start_index);
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; });
}
@ -728,6 +753,16 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
initWriter(description);
}
}
else if (last_log_read_result.has_value())
{
/// check if we need to move it to another disk
auto current_log_disk = getCurrentLogDisk();
auto disk = getDisk();
auto & description = existing_changelogs.at(last_log_read_result->log_start_index);
if (current_log_disk != disk && current_log_disk == description->disk)
moveFileBetweenDisks(current_log_disk, description, disk, description->path);
}
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer->isFileSet())
@ -740,27 +775,37 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
void Changelog::initWriter(ChangelogFileDescriptionPtr description)
{
if (description->expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description->expectedEntriesCountInLog());
LOG_TRACE(
log,
"Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}",
rotate_interval,
description->expectedEntriesCountInLog());
LOG_TRACE(log, "Continue to write into {}", description->path);
auto log_disk = description->disk;
auto current_log_disk = getCurrentLogDisk();
if (log_disk != current_log_disk)
moveFileBetweenDisks(log_disk, description, current_log_disk, description->path);
current_writer->setFile(std::move(description), WriteMode::Append);
}
namespace
{
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
}
@ -769,6 +814,11 @@ DiskPtr Changelog::getDisk() const
return keeper_context->getLogDisk();
}
DiskPtr Changelog::getCurrentLogDisk() const
{
return keeper_context->getCurrentLogDisk();
}
void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
{
auto disk = getDisk();
@ -786,7 +836,23 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
LOG_WARNING(log, "Removing changelog {}", itr->second->path);
const std::filesystem::path & path = itr->second->path;
const auto new_path = timestamp_folder / path.filename();
disk->moveFile(path.generic_string(), new_path.generic_string());
auto changelog_disk = itr->second->disk;
if (changelog_disk == disk)
{
try
{
disk->moveFile(path.generic_string(), new_path.generic_string());
}
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED)
moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path);
}
}
else
moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path);
itr = existing_changelogs.erase(itr);
}
}
@ -921,7 +987,6 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before writing records");
{
std::lock_guard lock(writer_mutex);
/// This write_at require to overwrite everything in this file and also in previous file(s)
const bool go_to_previous_file = index < current_writer->getStartIndex();
@ -937,13 +1002,18 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
else
description = std::prev(index_changelog)->second;
auto log_disk = description->disk;
auto current_log_disk = getCurrentLogDisk();
if (log_disk != current_log_disk)
moveFileBetweenDisks(log_disk, description, current_log_disk, description->path);
current_writer->setFile(std::move(description), WriteMode::Append);
/// Remove all subsequent files if overwritten something in previous one
auto to_remove_itr = existing_changelogs.upper_bound(index);
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
{
getDisk()->removeFile(itr->second->path);
itr->second->disk->removeFile(itr->second->path);
itr = existing_changelogs.erase(itr);
}
}
@ -993,16 +1063,17 @@ void Changelog::compact(uint64_t up_to_log_index)
LOG_INFO(log, "Removing changelog {} because of compaction", changelog_description.path);
/// If failed to push to queue for background removing, then we will remove it now
if (!log_files_to_delete_queue.tryPush(changelog_description.path, 1))
if (!log_files_to_delete_queue.tryPush({changelog_description.path, changelog_description.disk}, 1))
{
try
{
getDisk()->removeFile(itr->second->path);
LOG_INFO(log, "Removed changelog {} because of compaction.", itr->second->path);
changelog_description.disk->removeFile(changelog_description.path);
LOG_INFO(log, "Removed changelog {} because of compaction.", changelog_description.path);
}
catch (Exception & e)
{
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", itr->second->path, e.message());
LOG_WARNING(
log, "Failed to remove changelog {} in compaction, error message: {}", changelog_description.path, e.message());
}
catch (...)
{
@ -1197,12 +1268,13 @@ Changelog::~Changelog()
void Changelog::cleanLogThread()
{
std::string path;
while (log_files_to_delete_queue.pop(path))
std::pair<std::string, DiskPtr> path_with_disk;
while (log_files_to_delete_queue.pop(path_with_disk))
{
const auto & [path, disk] = path_with_disk;
try
{
getDisk()->removeFile(path);
disk->removeFile(path);
LOG_INFO(log, "Removed changelog {} because of compaction.", path);
}
catch (Exception & e)

View File

@ -60,6 +60,7 @@ struct ChangelogFileDescription
uint64_t to_log_index;
std::string extension;
DiskPtr disk;
std::string path;
bool deleted = false;
@ -154,6 +155,7 @@ private:
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
DiskPtr getDisk() const;
DiskPtr getCurrentLogDisk() const;
/// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescriptionPtr> existing_changelogs;
@ -187,7 +189,7 @@ private:
uint64_t max_log_id = 0;
/// For compaction, queue of delete not used logs
/// 128 is enough, even if log is not removed, it's not a problem
ConcurrentBoundedQueue<std::string> log_files_to_delete_queue{128};
ConcurrentBoundedQueue<std::pair<std::string, DiskPtr>> log_files_to_delete_queue{128};
ThreadFromGlobalPool clean_log_thread;
struct AppendLog

View File

@ -21,6 +21,12 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config)
disk_selector->initialize(config, "storage_configuration.disks", Context::getGlobalContextInstance());
log_storage = getLogsPathFromConfig(config);
if (config.has("keeper_server.current_log_storage_disk"))
current_log_storage = config.getString("keeper_server.current_log_storage_disk");
else
current_log_storage = log_storage;
snapshot_storage = getSnapshotsPathFromConfig(config);
state_file_storage = getStatePathFromConfig(config);
@ -57,7 +63,6 @@ DiskPtr KeeperContext::getDisk(const Storage & storage) const
return *storage_disk;
const auto & disk_name = std::get<std::string>(storage);
return disk_selector->get(disk_name);
}
@ -66,6 +71,11 @@ DiskPtr KeeperContext::getLogDisk() const
return getDisk(log_storage);
}
DiskPtr KeeperContext::getCurrentLogDisk() const
{
return getDisk(current_log_storage);
}
DiskPtr KeeperContext::getSnapshotsDisk() const
{
return getDisk(snapshot_storage);

View File

@ -32,6 +32,7 @@ public:
bool digestEnabled() const;
void setDigestEnabled(bool digest_enabled_);
DiskPtr getCurrentLogDisk() const;
DiskPtr getLogDisk() const;
DiskPtr getSnapshotsDisk() const;
DiskPtr getStateFileDisk() const;
@ -53,6 +54,7 @@ private:
std::shared_ptr<DiskSelector> disk_selector;
Storage log_storage;
Storage current_log_storage;
Storage snapshot_storage;
Storage state_file_storage;

View File

@ -520,19 +520,41 @@ KeeperSnapshotManager::KeeperSnapshotManager(
, keeper_context(keeper_context_)
{
auto disk = getDisk();
std::unordered_set<std::string> invalid_snapshots;
/// collect invalid snapshots
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
const auto & name = it->name();
if (name.empty())
continue;
if (startsWith(name, "tmp_"))
{
disk->removeFile(it->path());
invalid_snapshots.insert(name.substr(4));
continue;
}
}
/// process snapshots
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
const auto & name = it->name();
if (name.empty())
continue;
/// Not snapshot file
if (!startsWith(name, "snapshot_"))
continue;
if (invalid_snapshots.contains(name))
{
disk->removeFile(it->path());
continue;
}
/// Not snapshot file
if (!startsWith(name, "snapshot_"))
continue;
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name);
existing_snapshots[snapshot_up_to] = it->path();
}
@ -549,11 +571,16 @@ std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
auto disk = getDisk();
auto plain_buf = disk->writeFile(tmp_snapshot_file_name);
{
disk->writeFile(tmp_snapshot_file_name);
}
auto plain_buf = disk->writeFile(snapshot_file_name);
copyData(reader, *plain_buf);
plain_buf->sync();
disk->moveFile(tmp_snapshot_file_name, snapshot_file_name);
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_name);
removeOutdatedSnapshotsIfNeeded();
@ -673,7 +700,12 @@ std::pair<std::string, std::error_code> KeeperSnapshotManager::serializeSnapshot
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
auto writer = getDisk()->writeFile(tmp_snapshot_file_name);
auto disk = getDisk();
{
disk->writeFile(tmp_snapshot_file_name);
}
auto writer = disk->writeFile(snapshot_file_name);
std::unique_ptr<WriteBuffer> compressed_writer;
if (compress_snapshots_zstd)
compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3);
@ -688,7 +720,8 @@ std::pair<std::string, std::error_code> KeeperSnapshotManager::serializeSnapshot
try
{
getDisk()->moveFile(tmp_snapshot_file_name, snapshot_file_name);
std::cout << "Removing file " << tmp_snapshot_file_name << std::endl;
disk->removeFile(tmp_snapshot_file_name);
}
catch (fs::filesystem_error & e)
{

View File

@ -385,7 +385,7 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
}
auto state = nuraft::srv_state::deserialize(*state_buf);
LOG_INFO(logger, "Read state from {}", disk->getPath() + path);
LOG_INFO(logger, "Read state from {}", fs::path(disk->getPath()) / path);
return state;
}
catch (const std::exception & e)
@ -408,7 +408,6 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
if (state)
{
disk->removeFileIfExists(old_path);
return state;
}

View File

@ -28,6 +28,10 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct ContextSharedPart : boost::noncopyable
{

View File

@ -381,6 +381,11 @@ void SettingFieldMap::readBinary(ReadBuffer & in)
#else
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SettingFieldMap::SettingFieldMap(const Field &) : value(Map()) {}
String SettingFieldMap::toString() const
{

View File

@ -120,6 +120,7 @@ const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getSt
void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::string & path)
{
auto object = StoredObject(metadata_storage.getAbsolutePath(path));
std::cout << "Removing from plain " << path << std::endl;
metadata_storage.object_storage->removeObject(object);
}

View File

@ -63,7 +63,7 @@ public:
uint32_t getHardlinkCount(const std::string & /* path */) const override
{
return 1;
return 0;
}
bool supportsChmod() const override { return false; }

View File

@ -1,5 +1,6 @@
#include <IO/ZstdDeflatingAppendableWriteBuffer.h>
#include <Common/Exception.h>
#include "IO/ReadBufferFromFileBase.h"
#include <IO/ReadBufferFromFile.h>
namespace DB
@ -14,11 +15,13 @@ ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBufferFromFileBase> out_,
int compression_level,
bool append_to_existing_file_,
std::function<std::unique_ptr<ReadBufferFromFileBase>()> read_buffer_creator_,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, out(std::move(out_))
, read_buffer_creator(std::move(read_buffer_creator_))
, append_to_existing_file(append_to_existing_file_)
{
cctx = ZSTD_createCCtx();
@ -194,13 +197,13 @@ void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
{
ReadBufferFromFile reader(out->getFileName());
auto fsize = reader.getFileSize();
auto reader = read_buffer_creator();
auto fsize = reader->getFileSize();
if (fsize > 3)
{
std::array<char, 3> result;
reader.seek(fsize - 3, SEEK_SET);
reader.readStrict(result.data(), 3);
reader->seek(fsize - 3, SEEK_SET);
reader->readStrict(result.data(), 3);
/// If we don't have correct block in the end, then we need to add it manually.
/// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write.

View File

@ -5,6 +5,7 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <zstd.h>
@ -32,6 +33,7 @@ public:
std::unique_ptr<WriteBufferFromFileBase> out_,
int compression_level,
bool append_to_existing_file_,
std::function<std::unique_ptr<ReadBufferFromFileBase>()> read_buffer_creator_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
@ -69,6 +71,7 @@ private:
void addEmptyBlock();
std::unique_ptr<WriteBufferFromFileBase> out;
std::function<std::unique_ptr<ReadBufferFromFileBase>()> read_buffer_creator;
bool append_to_existing_file = false;
ZSTD_CCtx * cctx;

View File

@ -1235,7 +1235,7 @@ struct HTTPContext : public IHTTPContext
}
#else
#else
#include <Coordination/Standalone/Context.h>