style fix

This commit is contained in:
Antonio Andelic 2022-12-22 10:31:07 +00:00
parent f073878c40
commit 55548a6678
3 changed files with 142 additions and 70 deletions

View File

@ -114,18 +114,18 @@ public:
rotate_interval,
file_description->expectedEntriesCountInLog());
if (file_buf)
if (file_buf && prealloc_done)
{
finalizeCurrentFile(/* final */ false);
finalizeCurrentFile(/*final*/ false);
assert(current_file_description);
if (last_index && *last_index != current_file_description->to_log_index)
if (last_index_written && *last_index_written != current_file_description->to_log_index)
{
auto new_path = formatChangelogPath(
changelogs_dir,
current_file_description->prefix,
current_file_description->from_log_index,
*last_index,
*last_index_written,
current_file_description->extension);
std::filesystem::rename(current_file_description->path, new_path);
current_file_description->path = std::move(new_path);
@ -134,8 +134,7 @@ public:
file_buf = std::make_unique<WriteBufferFromFile>(
file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY));
start_index = file_description->from_log_index;
last_index.reset();
last_index_written.reset();
current_file_description = std::move(file_description);
if (compress_logs && mode == WriteMode::Append && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path))
@ -143,12 +142,14 @@ public:
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
flush();
}
} catch (...) {
prealloc_done = false;
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("Changelog"));
throw;
}
prealloc_done = false;
}
bool isFileSet() const { return file_buf != nullptr; }
@ -166,49 +167,62 @@ public:
tryPreallocateForFile();
if (!prealloc_done)
{
return false;
}
}
writeIntBinary(computeRecordChecksum(record), getBuffer());
writeIntBinary(record.header.version, getBuffer());
writeIntBinary(record.header.index, getBuffer());
writeIntBinary(record.header.term, getBuffer());
writeIntBinary(record.header.value_type, getBuffer());
writeIntBinary(record.header.blob_size, getBuffer());
if (record.header.blob_size != 0)
getBuffer().write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
if (compressed_buffer)
const auto write_record = [&]
{
/// Flush compressed data to WriteBufferMemory working_buffer
compressed_buffer->next();
}
writeIntBinary(computeRecordChecksum(record), getBuffer());
WriteBufferMemory & cur_memory_buf
= compressed_buffer ? dynamic_cast<WriteBufferMemory &>(*compressed_buffer->getNestedBuffer()) : *memory_buf;
writeIntBinary(record.header.version, getBuffer());
writeIntBinary(record.header.index, getBuffer());
writeIntBinary(record.header.term, getBuffer());
writeIntBinary(record.header.value_type, getBuffer());
writeIntBinary(record.header.blob_size, getBuffer());
cur_memory_buf.finalize();
if (record.header.blob_size != 0)
getBuffer().write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
if (compressed_buffer)
{
/// Flush compressed data to WriteBufferMemory working_buffer
compressed_buffer->next();
}
};
write_record();
const auto get_memory_buffer = [&]() -> WriteBufferMemory &
{
return compressed_buffer ? dynamic_cast<WriteBufferMemory &>(*compressed_buffer->getNestedBuffer()) : *memory_buf;
};
// if the file is too big, rotate
if (max_log_file_size != 0 && written + memory.size() > total_bytes_available)
if (max_log_file_size != 0 && total_bytes_written + get_memory_buffer().valuesWritten() > total_bytes_available)
{
LOG_TRACE(&Poco::Logger::get("Changelog"), "Log file reached maximum allowed size ({} bytes), creating new log file", max_log_file_size);
rotate(record.header.index);
tryPreallocateForFile();
if (!prealloc_done)
return false;
// if we use compression we need to write again to the buffer because it's a new one
if (compress_logs)
write_record();
}
file_buf->write(memory.raw_data(), memory.size());
// after possible rotation, we don't check if we have enough space
// because writing at least 1 log is requirement - we don't want empty log files
written += memory.size();
auto & cur_memory_buf = get_memory_buffer();
file_buf->write(memory.raw_data(), cur_memory_buf.valuesWritten());
total_bytes_written += cur_memory_buf.valuesWritten();
cur_memory_buf.restart();
last_index = record.header.index;
last_index_written = record.header.index;
return true;
}
@ -219,7 +233,11 @@ public:
file_buf->sync();
}
uint64_t getStartIndex() const { return start_index; }
uint64_t getStartIndex() const
{
assert(current_file_description);
return current_file_description->from_log_index;
}
void rotate(uint64_t new_start_log_index)
{
@ -248,43 +266,42 @@ public:
void finalize()
{
if (file_buf)
finalizeCurrentFile(/* final */ true);
if (file_buf && prealloc_done)
finalizeCurrentFile(/*final*/ true);
}
private:
void finalizeCurrentFile(bool final)
{
assert(file_buf);
assert(file_buf && prealloc_done);
// reset compression buffer
if (compress_logs)
{
compressed_buffer->finalize();
if (ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path))
{
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
assert(max_log_file_size == 0 || memory.size() < total_bytes_available - total_bytes_written);
total_bytes_written += ZstdDeflatingAppendableWriteBuffer::ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
}
WriteBufferMemory & cur_memory_buf
= dynamic_cast<WriteBufferMemory &>(*compressed_buffer->getNestedBuffer());
cur_memory_buf.finalize();
file_buf->write(memory.raw_data(), memory.size());
flush();
assert(max_log_file_size == 0 || !prealloc_done || memory.size() < total_bytes_available - written);
written += memory.size();
// so finalize can be done
cur_memory_buf.restart();
if (final)
compressed_buffer.reset();
else
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::make_unique<WriteBufferMemory>(memory), /* compression level = */ 3);
}
else
{
flush();
}
flush();
if (max_log_file_size != 0)
ftruncate(file_buf->getFD(), written);
ftruncate(file_buf->getFD(), total_bytes_written);
file_buf.reset();
}
@ -301,7 +318,7 @@ private:
if (max_log_file_size == 0)
{
total_bytes_available = 0;
written = 0;
total_bytes_written = 0;
prealloc_done = true;
return;
}
@ -322,11 +339,11 @@ private:
struct stat buf;
{
int res = fstat(file_buf->getFD(), &buf);
[[maybe_unused]] int res = fstat(file_buf->getFD(), &buf);
assert(res == 0);
}
written = buf.st_size;
total_bytes_written = buf.st_size;
total_bytes_available = fallocate_ok ? buf.st_blocks * 512 : max_log_file_size;
// always leave space for last termination block
@ -341,29 +358,27 @@ private:
using MemoryBuffer = PODArray<UInt8>;
using WriteBufferMemory = WriteBufferFromVector<MemoryBuffer>;
MemoryBuffer memory;
// we write everything to memory buffer to see how many bytes are required
// when we have that information we can try to write to the disk
std::unique_ptr<WriteBufferMemory> memory_buf;
ChangelogFileDescriptionPtr current_file_description{nullptr};
std::unique_ptr<WriteBufferFromFile> file_buf;
size_t written{0};
std::optional<uint64_t> last_index_written;
size_t total_bytes_written{0};
size_t total_bytes_available{0};
std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer;
bool prealloc_done{false};
bool compress_logs;
bool force_fsync;
uint64_t rotate_interval;
uint64_t max_log_file_size;
// Changelog configuration
const bool compress_logs;
const bool force_fsync;
const uint64_t rotate_interval;
const uint64_t max_log_file_size;
const std::filesystem::path changelogs_dir;
uint64_t start_index;
std::optional<uint64_t> last_index;
};
struct ChangelogReadResult
@ -1068,8 +1083,11 @@ void Changelog::shutdown()
if (write_thread.joinable())
write_thread.join();
current_writer->finalize();
current_writer.reset();
if (current_writer)
{
current_writer->finalize();
current_writer.reset();
}
}
Changelog::~Changelog()

View File

@ -2271,6 +2271,57 @@ TEST_P(CoordinationTest, TestSystemNodeModify)
assert_create("/keeper1/test", Error::ZOK);
}
TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
uint64_t last_entry_index{0};
size_t i{0};
{
SCOPED_TRACE("Small rotation interval, big size limit");
DB::KeeperLogStore changelog("./logs", 20, true, params.enable_compression, 50 * 1024 * 1024);
changelog.init(1, 0);
for (; i < 100; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
last_entry_index = changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
waitDurableLogs(changelog);
ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10);
}
{
SCOPED_TRACE("Large rotation interval, small size limit");
DB::KeeperLogStore changelog("./logs", 100'000, true, params.enable_compression, 4000);
changelog.init(1, 0);
ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10);
for (; i < 500; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
last_entry_index = changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
waitDurableLogs(changelog);
ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10);
}
{
SCOPED_TRACE("Final verify all logs");
DB::KeeperLogStore changelog("./logs", 100'000, true, params.enable_compression, 4000);
changelog.init(1, 0);
ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10);
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{

View File

@ -58,6 +58,12 @@ public:
finalized = false;
}
size_t valuesWritten() {
return ((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
/ sizeof(typename VectorType::value_type);
}
~WriteBufferFromVector() override
{
finalize();
@ -66,10 +72,7 @@ public:
private:
void finalizeImpl() override
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
/ sizeof(typename VectorType::value_type));
vector.resize(valuesWritten());
/// Prevent further writes.
set(nullptr, 0);