Add truncate

This commit is contained in:
Antonio Andelic 2022-12-15 13:09:52 +00:00
parent bf6f163952
commit ee900ca7df
4 changed files with 109 additions and 59 deletions

View File

@ -29,10 +29,10 @@ namespace
constexpr auto DEFAULT_PREFIX = "changelog"; constexpr auto DEFAULT_PREFIX = "changelog";
std::string formatChangelogPath(const std::string & prefix, const ChangelogFileDescription & name, bool with_to_log) std::string formatChangelogPath(const std::string & prefix, const std::string & name_prefix, uint64_t from_index, uint64_t to_index, const std::string & extension)
{ {
std::filesystem::path path(prefix); std::filesystem::path path(prefix);
path /= std::filesystem::path(name.prefix + "_" + std::to_string(name.from_log_index) + (with_to_log ? "_" + std::to_string(name.to_log_index) : "") + "." + name.extension); path /= std::filesystem::path(name_prefix + "_" + std::to_string(from_index) + "_" + std::to_string(to_index) + "." + extension);
return path; return path;
} }
@ -66,7 +66,7 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
return hash.get64(); return hash.get64();
} }
constexpr size_t log_size = 50 * 1024 * 1024; constexpr size_t log_size = 60 * 1024 * 1024;
} }
@ -97,10 +97,23 @@ public:
{ {
file_buf = std::make_unique<WriteBufferFromFile>(file_description.path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY)); file_buf = std::make_unique<WriteBufferFromFile>(file_description.path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY));
start_index = file_description.from_log_index; start_index = file_description.from_log_index;
last_index = start_index;
current_file_description = &file_description; current_file_description = &file_description;
if (compress_logs && mode == WriteMode::Append && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(file_description.path))
{
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
file_buf->sync();
}
prealloc_done = false; prealloc_done = false;
} }
bool isFileSet() const
{
return file_buf != nullptr;
}
bool appendRecord(ChangelogRecord && record) bool appendRecord(ChangelogRecord && record)
{ {
@ -111,11 +124,10 @@ public:
if (!prealloc_done) [[unlikely]] if (!prealloc_done) [[unlikely]]
{ {
int res = fallocate(file_buf->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_size); tryPreallocateForFile();
if (res == ENOSPC)
return false;
prealloc_done = true; if (!prealloc_done)
return false;
} }
writeIntBinary(computeRecordChecksum(record), getBuffer()); writeIntBinary(computeRecordChecksum(record), getBuffer());
@ -139,13 +151,22 @@ public:
cur_memory_buf.finalize(); cur_memory_buf.finalize();
// TODO: rotate if it's larger than max log size if (written + memory.size() > total_bytes_available)
{
LOG_INFO(&Poco::Logger::get("LOGGER"), "Size of the file is too big, rotating");
rotate(record.header.index);
tryPreallocateForFile();
if (!prealloc_done)
return false;
}
file_buf->write(memory.raw_data(), memory.size()); file_buf->write(memory.raw_data(), memory.size());
cur_memory_buf.restart(); written += memory.size();
current_file_description->to_log_index++; cur_memory_buf.restart();
last_index = record.header.index;
return true; return true;
} }
@ -163,14 +184,23 @@ public:
void rotate(uint64_t new_start_log_index) void rotate(uint64_t new_start_log_index)
{ {
if (file_buf)
{
if (compress_logs && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path))
{
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
file_buf->sync();
written += ZstdDeflatingAppendableWriteBuffer::ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
}
flush(); flush();
if (current_file_description) ftruncate(file_buf->getFD(), written);
{
const auto old_path = current_file_description->path;
auto new_path = formatChangelogPath(changelogs_dir, *current_file_description, true);
std::filesystem::rename(old_path, new_path); LOG_INFO(&Poco::Logger::get("LOGGER"), "Truncated to {} bytes", written);
auto new_path = formatChangelogPath(changelogs_dir, current_file_description->prefix, current_file_description->from_log_index, last_index, current_file_description->extension);
std::filesystem::rename(current_file_description->path, new_path);
current_file_description->path = std::move(new_path); current_file_description->path = std::move(new_path);
} }
@ -178,13 +208,13 @@ public:
ChangelogFileDescription new_description; ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX; new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index; new_description.from_log_index = new_start_log_index;
new_description.to_log_index = new_start_log_index; new_description.to_log_index = new_start_log_index + rotate_interval - 1;
new_description.extension = "bin"; new_description.extension = "bin";
if (compress_logs) if (compress_logs)
new_description.extension += "." + toContentEncodingName(CompressionMethod::Zstd); new_description.extension += "." + toContentEncodingName(CompressionMethod::Zstd);
new_description.path = formatChangelogPath(changelogs_dir, new_description, false); new_description.path = formatChangelogPath(changelogs_dir, new_description.prefix, new_start_log_index, new_start_log_index + rotate_interval - 1, new_description.extension);
LOG_TRACE(&Poco::Logger::get("Changelog"), "Starting new changelog {}", new_description.path); LOG_TRACE(&Poco::Logger::get("Changelog"), "Starting new changelog {}", new_description.path);
auto [it, inserted] = existing_changelogs.insert(std::make_pair(new_start_log_index, std::move(new_description))); auto [it, inserted] = existing_changelogs.insert(std::make_pair(new_start_log_index, std::move(new_description)));
@ -200,6 +230,28 @@ private:
return *memory_buf; return *memory_buf;
} }
void tryPreallocateForFile()
{
int res = fallocate(file_buf->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_size);
if (res == ENOSPC)
return;
struct stat buf;
res = fstat(file_buf->getFD(), &buf);
assert(res == 0);
LOG_INFO(&Poco::Logger::get("LOGGER"), "ALLOCATED BLOCKS {}, size {}", buf.st_blocks, buf.st_size);
written = buf.st_size;
total_bytes_available = buf.st_blocks * 512;
// always leave space for last termination block
if (compress_logs)
total_bytes_available -= ZstdDeflatingAppendableWriteBuffer::ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
prealloc_done = true;
}
std::map<uint64_t, ChangelogFileDescription> & existing_changelogs; std::map<uint64_t, ChangelogFileDescription> & existing_changelogs;
using MemoryBuffer = PODArray<UInt8>; using MemoryBuffer = PODArray<UInt8>;
@ -209,6 +261,8 @@ private:
ChangelogFileDescription * current_file_description{nullptr}; ChangelogFileDescription * current_file_description{nullptr};
std::unique_ptr<WriteBufferFromFile> file_buf; std::unique_ptr<WriteBufferFromFile> file_buf;
size_t written{0};
size_t total_bytes_available{0};
std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer; std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer;
@ -222,9 +276,8 @@ private:
const std::filesystem::path changelogs_dir; const std::filesystem::path changelogs_dir;
//size_t written{0};
uint64_t start_index; uint64_t start_index;
uint64_t last_index;
}; };
struct ChangelogReadResult struct ChangelogReadResult
@ -423,7 +476,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
removeAllLogs(); removeAllLogs();
min_log_id = last_commited_log_index; min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1; max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
rotate(max_log_id + 1, writer_lock); current_writer->rotate(max_log_id + 1);
return; return;
} }
else if (changelog_description.from_log_index > start_to_read_from) else if (changelog_description.from_log_index > start_to_read_from)
@ -513,8 +566,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
} }
/// Start new log if we don't initialize writer from previous log. All logs can be "complete". /// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer) if (!current_writer->isFileSet())
rotate(max_log_id + 1, writer_lock); current_writer->rotate(max_log_id + 1);
initialized = true; initialized = true;
} }
@ -760,7 +813,7 @@ void Changelog::compact(uint64_t up_to_log_index)
std::erase_if(logs, [up_to_log_index] (const auto & item) { return item.first <= up_to_log_index; }); std::erase_if(logs, [up_to_log_index] (const auto & item) { return item.first <= up_to_log_index; });
if (need_rotate) if (need_rotate)
rotate(up_to_log_index + 1, lock); current_writer->rotate(up_to_log_index + 1);
LOG_INFO(log, "Compaction up to {} finished new min index {}, new max index {}", up_to_log_index, min_log_id, max_log_id); LOG_INFO(log, "Compaction up to {} finished new min index {}, new max index {}", up_to_log_index, min_log_id, max_log_id);
} }

View File

@ -147,9 +147,6 @@ private:
/// Pack log_entry into changelog record /// Pack log_entry into changelog record
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry); static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(uint64_t new_start_log_index, std::lock_guard<std::mutex> & writer_lock);
/// Currently existing changelogs /// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescription> existing_changelogs; std::map<uint64_t, ChangelogFileDescription> existing_changelogs;

View File

@ -160,42 +160,42 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeZstd()
} }
} }
void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock() void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(WriteBuffer & write_buffer)
{ {
/// HACK: https://github.com/facebook/zstd/issues/2090#issuecomment-620158967 /// HACK: https://github.com/facebook/zstd/issues/2090#issuecomment-620158967
if (out->buffer().size() - out->offset() < ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size()) if (write_buffer.buffer().size() - write_buffer.offset() < ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size())
out->next(); write_buffer.next();
std::memcpy(out->buffer().begin() + out->offset(), std::memcpy(write_buffer.buffer().begin() + write_buffer.offset(),
ZSTD_CORRECT_TERMINATION_LAST_BLOCK.data(), ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size()); ZSTD_CORRECT_TERMINATION_LAST_BLOCK.data(), ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size());
out->position() = out->buffer().begin() + out->offset() + ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size(); write_buffer.position() = write_buffer.buffer().begin() + write_buffer.offset() + ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
} }
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock() bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(const std::string & file_name)
{ {
//ReadBufferFromFile reader(out->getFileName()); ReadBufferFromFile reader(file_name);
//auto fsize = reader.getFileSize(); auto fsize = reader.getFileSize();
//if (fsize > 3) if (fsize > 3)
//{ {
// std::array<char, 3> result; std::array<char, 3> result;
// reader.seek(fsize - 3, SEEK_SET); reader.seek(fsize - 3, SEEK_SET);
// reader.readStrict(result.data(), 3); reader.readStrict(result.data(), 3);
// /// If we don't have correct block in the end, then we need to add it manually. /// If we don't have correct block in the end, then we need to add it manually.
// /// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write. /// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write.
// /// But in this case file still corrupted and we have to remove it. /// But in this case file still corrupted and we have to remove it.
// return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK; return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK;
//} }
//else if (fsize > 0) else if (fsize > 0)
//{ {
// throw Exception( throw Exception(
// ErrorCodes::ZSTD_ENCODER_FAILED, ErrorCodes::ZSTD_ENCODER_FAILED,
// "Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption", "Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption",
// out->getFileName(), fsize); file_name, fsize);
//} }
return false; return false;
} }

View File

@ -45,6 +45,13 @@ public:
WriteBuffer * getNestedBuffer() { return out.get(); } WriteBuffer * getNestedBuffer() { return out.get(); }
/// Read three last bytes from non-empty compressed file and compares them with
/// ZSTD_CORRECT_TERMINATION_LAST_BLOCK.
static bool isNeedToAddEmptyBlock(const std::string & file_name);
/// Adding zstd empty block (ZSTD_CORRECT_TERMINATION_LAST_BLOCK) to out.working_buffer
static void addEmptyBlock(WriteBuffer & write_buffer);
private: private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full /// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override; void nextImpl() override;
@ -60,13 +67,6 @@ private:
void finalizeAfter(); void finalizeAfter();
void finalizeZstd(); void finalizeZstd();
/// Read three last bytes from non-empty compressed file and compares them with
/// ZSTD_CORRECT_TERMINATION_LAST_BLOCK.
static bool isNeedToAddEmptyBlock();
/// Adding zstd empty block (ZSTD_CORRECT_TERMINATION_LAST_BLOCK) to out.working_buffer
void addEmptyBlock();
std::unique_ptr<WriteBuffer> out; std::unique_ptr<WriteBuffer> out;
//bool append_to_existing_file = false; //bool append_to_existing_file = false;