Try with fallocate

This commit is contained in:
Antonio Andelic 2022-12-13 11:08:59 +00:00
parent bca18298a1
commit 898cf867af
5 changed files with 105 additions and 57 deletions

View File

@ -66,6 +66,8 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
return hash.get64();
}
constexpr size_t log_size = 50 * 1024 * 1024;
}
class ChangelogWriter
@ -74,6 +76,7 @@ public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, uint64_t start_index_)
: filepath(filepath_)
, file_buf(std::make_unique<WriteBufferFromFile>(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY)))
, memory_buf(std::make_unique<WriteBufferMemory>(memory))
, start_index(start_index_)
{
auto compression_method = chooseCompressionMethod(filepath_, "");
@ -84,7 +87,7 @@ public:
else if (compression_method == CompressionMethod::Zstd)
{
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(
std::move(file_buf), /* compression level = */ 3, /* append_to_existing_file_ = */ mode == WriteMode::Append);
std::move(memory_buf), /* compression level = */ 3);
}
else
{
@ -93,8 +96,17 @@ public:
}
void appendRecord(ChangelogRecord && record)
bool appendRecord(ChangelogRecord && record)
{
if (!prealloc_done) [[unlikely]]
{
int res = fallocate(file_buf->getFD(), FALLOC_FL_KEEP_SIZE, 0, log_size);
if (res == ENOSPC)
return false;
prealloc_done = true;
}
writeIntBinary(computeRecordChecksum(record), getBuffer());
writeIntBinary(record.header.version, getBuffer());
@ -105,6 +117,9 @@ public:
if (record.header.blob_size != 0)
getBuffer().write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
// TODO: Maybe try writing to file after each record
return true;
}
void flush(bool force_fsync)
@ -115,14 +130,18 @@ public:
compressed_buffer->next();
}
WriteBuffer * working_buf = compressed_buffer ? compressed_buffer->getNestedBuffer() : file_buf.get();
WriteBufferMemory & cur_memory_buf = compressed_buffer ? dynamic_cast<WriteBufferMemory &>(*compressed_buffer->getNestedBuffer()) : *memory_buf;
/// Flush working buffer to file system
working_buf->next();
cur_memory_buf.finalize();
// TODO: rotate if it's larger than max log size
file_buf->write(memory.raw_data(), memory.size());
/// Fsync file system if needed
if (force_fsync)
working_buf->sync();
file_buf->sync();
cur_memory_buf.restart();
}
uint64_t getStartIndex() const
@ -135,12 +154,22 @@ private:
{
if (compressed_buffer)
return *compressed_buffer;
return *file_buf;
return *memory_buf;
}
using MemoryBuffer = PODArray<UInt8>;
using WriteBufferMemory = WriteBufferFromVector<MemoryBuffer>;
MemoryBuffer memory;
std::string filepath;
std::unique_ptr<WriteBufferFromFile> file_buf;
std::unique_ptr<WriteBufferMemory> memory_buf;
std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer;
bool prealloc_done{false};
//size_t written{0};
uint64_t start_index;
};
@ -550,12 +579,16 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e
void Changelog::writeThread()
{
WriteOperation write_operation;
bool batch_append_ok = true;
while (write_operations.pop(write_operation))
{
assert(initialized);
if (auto * append_log = std::get_if<AppendLog>(&write_operation))
{
if (!batch_append_ok)
continue;
std::lock_guard writer_lock(writer_mutex);
assert(current_writer);
@ -565,30 +598,40 @@ void Changelog::writeThread()
if (log_is_complete)
rotate(append_log->index, writer_lock);
current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
batch_append_ok &= current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
}
else
{
const auto & flush = std::get<Flush>(write_operation);
if (batch_append_ok)
{
std::lock_guard writer_lock(writer_mutex);
if (current_writer)
current_writer->flush(force_sync);
}
{
std::lock_guard writer_lock(writer_mutex);
if (current_writer)
current_writer->flush(force_sync);
}
{
std::lock_guard lock{durable_idx_mutex};
last_durable_idx = flush.index;
}
}
else
{
std::lock_guard lock{durable_idx_mutex};
last_durable_idx = flush.index;
*flush.failed = true;
}
durable_idx_cv.notify_all();
// we shouldn't start the raft_server before sending it here
if (auto raft_server_locked = raft_server.lock())
raft_server_locked->notify_log_append_completion(true);
raft_server_locked->notify_log_append_completion(batch_append_ok);
else
LOG_WARNING(log, "Raft server is not set in LogStore.");
batch_append_ok = true;
}
}
}
@ -799,21 +842,29 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
}
}
void Changelog::flush()
bool Changelog::flush()
{
if (flushAsync())
if (auto failed_ptr = flushAsync())
{
std::unique_lock lock{durable_idx_mutex};
durable_idx_cv.wait(lock, [&] { return last_durable_idx == max_log_id; });
durable_idx_cv.wait(lock, [&] { return failed_ptr || last_durable_idx == max_log_id; });
return !*failed_ptr;
}
return true;
}
bool Changelog::flushAsync()
std::shared_ptr<bool> Changelog::flushAsync()
{
bool pushed = write_operations.push(Flush{max_log_id});
auto failed = std::make_shared<bool>(false);
bool pushed = write_operations.push(Flush{max_log_id, failed});
if (!pushed)
{
LOG_WARNING(log, "Changelog is shut down");
return pushed;
return nullptr;
}
return failed;
}
void Changelog::shutdown()

View File

@ -121,9 +121,9 @@ public:
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer);
/// Fsync latest log to disk and flush buffer
void flush();
bool flush();
bool flushAsync();
std::shared_ptr<bool> flushAsync();
void shutdown();
@ -197,6 +197,7 @@ private:
struct Flush
{
uint64_t index;
std::shared_ptr<bool> failed;
};
using WriteOperation = std::variant<AppendLog, Flush>;

View File

@ -90,8 +90,7 @@ bool KeeperLogStore::compact(uint64_t last_log_index)
bool KeeperLogStore::flush()
{
std::lock_guard lock(changelog_lock);
changelog.flush();
return true;
return changelog.flush();
}
void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)

View File

@ -11,15 +11,13 @@ namespace ErrorCodes
}
ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBufferFromFile> out_,
std::unique_ptr<WriteBuffer> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, out(std::move(out_))
, append_to_existing_file(append_to_existing_file_)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -41,11 +39,11 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
input.size = offset();
input.pos = 0;
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
{
addEmptyBlock();
first_write = false;
}
//if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
//{
// addEmptyBlock();
// first_write = false;
//}
try
{
@ -178,26 +176,26 @@ void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
{
ReadBufferFromFile reader(out->getFileName());
auto fsize = reader.getFileSize();
if (fsize > 3)
{
std::array<char, 3> result;
reader.seek(fsize - 3, SEEK_SET);
reader.readStrict(result.data(), 3);
//ReadBufferFromFile reader(out->getFileName());
//auto fsize = reader.getFileSize();
//if (fsize > 3)
//{
// std::array<char, 3> result;
// reader.seek(fsize - 3, SEEK_SET);
// reader.readStrict(result.data(), 3);
/// If we don't have correct block in the end, then we need to add it manually.
/// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write.
/// But in this case file still corrupted and we have to remove it.
return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK;
}
else if (fsize > 0)
{
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption",
out->getFileName(), fsize);
}
// /// If we don't have correct block in the end, then we need to add it manually.
// /// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write.
// /// But in this case file still corrupted and we have to remove it.
// return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK;
//}
//else if (fsize > 0)
//{
// throw Exception(
// ErrorCodes::ZSTD_ENCODER_FAILED,
// "Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption",
// out->getFileName(), fsize);
//}
return false;
}

View File

@ -29,9 +29,8 @@ public:
static inline constexpr ZSTDLastBlock ZSTD_CORRECT_TERMINATION_LAST_BLOCK = {0x01, 0x00, 0x00};
ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBufferFromFile> out_,
std::unique_ptr<WriteBuffer> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
@ -63,14 +62,14 @@ private:
/// Read three last bytes from non-empty compressed file and compares them with
/// ZSTD_CORRECT_TERMINATION_LAST_BLOCK.
bool isNeedToAddEmptyBlock();
static bool isNeedToAddEmptyBlock();
/// Adding zstd empty block (ZSTD_CORRECT_TERMINATION_LAST_BLOCK) to out.working_buffer
void addEmptyBlock();
std::unique_ptr<WriteBufferFromFile> out;
std::unique_ptr<WriteBuffer> out;
bool append_to_existing_file = false;
//bool append_to_existing_file = false;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;