diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 2cdd7b2384e..bb1f09b7b23 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -114,18 +114,18 @@ public: rotate_interval, file_description->expectedEntriesCountInLog()); - if (file_buf) + if (file_buf && prealloc_done) { - finalizeCurrentFile(/* final */ false); + finalizeCurrentFile(/*final*/ false); assert(current_file_description); - if (last_index && *last_index != current_file_description->to_log_index) + if (last_index_written && *last_index_written != current_file_description->to_log_index) { auto new_path = formatChangelogPath( changelogs_dir, current_file_description->prefix, current_file_description->from_log_index, - *last_index, + *last_index_written, current_file_description->extension); std::filesystem::rename(current_file_description->path, new_path); current_file_description->path = std::move(new_path); @@ -134,8 +134,7 @@ public: file_buf = std::make_unique( file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY)); - start_index = file_description->from_log_index; - last_index.reset(); + last_index_written.reset(); current_file_description = std::move(file_description); if (compress_logs && mode == WriteMode::Append && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path)) @@ -143,12 +142,14 @@ public: ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf); flush(); } - } catch (...) { + + prealloc_done = false; + } + catch (...) + { tryLogCurrentException(&Poco::Logger::get("Changelog")); throw; } - - prealloc_done = false; } bool isFileSet() const { return file_buf != nullptr; } @@ -166,49 +167,62 @@ public: tryPreallocateForFile(); if (!prealloc_done) - { return false; - } } - writeIntBinary(computeRecordChecksum(record), getBuffer()); - - writeIntBinary(record.header.version, getBuffer()); - writeIntBinary(record.header.index, getBuffer()); - writeIntBinary(record.header.term, getBuffer()); - writeIntBinary(record.header.value_type, getBuffer()); - writeIntBinary(record.header.blob_size, getBuffer()); - - if (record.header.blob_size != 0) - getBuffer().write(reinterpret_cast(record.blob->data_begin()), record.blob->size()); - - if (compressed_buffer) + const auto write_record = [&] { - /// Flush compressed data to WriteBufferMemory working_buffer - compressed_buffer->next(); - } + writeIntBinary(computeRecordChecksum(record), getBuffer()); - WriteBufferMemory & cur_memory_buf - = compressed_buffer ? dynamic_cast(*compressed_buffer->getNestedBuffer()) : *memory_buf; + writeIntBinary(record.header.version, getBuffer()); + writeIntBinary(record.header.index, getBuffer()); + writeIntBinary(record.header.term, getBuffer()); + writeIntBinary(record.header.value_type, getBuffer()); + writeIntBinary(record.header.blob_size, getBuffer()); - cur_memory_buf.finalize(); + if (record.header.blob_size != 0) + getBuffer().write(reinterpret_cast(record.blob->data_begin()), record.blob->size()); + + if (compressed_buffer) + { + /// Flush compressed data to WriteBufferMemory working_buffer + compressed_buffer->next(); + } + }; + + write_record(); + + const auto get_memory_buffer = [&]() -> WriteBufferMemory & + { + return compressed_buffer ? dynamic_cast(*compressed_buffer->getNestedBuffer()) : *memory_buf; + }; // if the file is too big, rotate - if (max_log_file_size != 0 && written + memory.size() > total_bytes_available) + if (max_log_file_size != 0 && total_bytes_written + get_memory_buffer().valuesWritten() > total_bytes_available) { + LOG_TRACE(&Poco::Logger::get("Changelog"), "Log file reached maximum allowed size ({} bytes), creating new log file", max_log_file_size); rotate(record.header.index); tryPreallocateForFile(); if (!prealloc_done) return false; + + // if we use compression we need to write again to the buffer because it's a new one + if (compress_logs) + write_record(); } - file_buf->write(memory.raw_data(), memory.size()); + // after possible rotation, we don't check if we have enough space + // because writing at least 1 log is requirement - we don't want empty log files - written += memory.size(); + auto & cur_memory_buf = get_memory_buffer(); + + file_buf->write(memory.raw_data(), cur_memory_buf.valuesWritten()); + + total_bytes_written += cur_memory_buf.valuesWritten(); cur_memory_buf.restart(); - last_index = record.header.index; + last_index_written = record.header.index; return true; } @@ -219,7 +233,11 @@ public: file_buf->sync(); } - uint64_t getStartIndex() const { return start_index; } + uint64_t getStartIndex() const + { + assert(current_file_description); + return current_file_description->from_log_index; + } void rotate(uint64_t new_start_log_index) { @@ -248,43 +266,42 @@ public: void finalize() { - if (file_buf) - finalizeCurrentFile(/* final */ true); + if (file_buf && prealloc_done) + finalizeCurrentFile(/*final*/ true); } private: void finalizeCurrentFile(bool final) { - assert(file_buf); + assert(file_buf && prealloc_done); + // reset compression buffer if (compress_logs) { - compressed_buffer->finalize(); + if (ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path)) + { + ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf); + assert(max_log_file_size == 0 || memory.size() < total_bytes_available - total_bytes_written); + total_bytes_written += ZstdDeflatingAppendableWriteBuffer::ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size(); + } + WriteBufferMemory & cur_memory_buf = dynamic_cast(*compressed_buffer->getNestedBuffer()); - cur_memory_buf.finalize(); - - file_buf->write(memory.raw_data(), memory.size()); - flush(); - - assert(max_log_file_size == 0 || !prealloc_done || memory.size() < total_bytes_available - written); - written += memory.size(); + // so finalize can be done + cur_memory_buf.restart(); if (final) compressed_buffer.reset(); else compressed_buffer = std::make_unique(std::make_unique(memory), /* compression level = */ 3); } - else - { - flush(); - } + flush(); if (max_log_file_size != 0) - ftruncate(file_buf->getFD(), written); + ftruncate(file_buf->getFD(), total_bytes_written); file_buf.reset(); } @@ -301,7 +318,7 @@ private: if (max_log_file_size == 0) { total_bytes_available = 0; - written = 0; + total_bytes_written = 0; prealloc_done = true; return; } @@ -322,11 +339,11 @@ private: struct stat buf; { - int res = fstat(file_buf->getFD(), &buf); + [[maybe_unused]] int res = fstat(file_buf->getFD(), &buf); assert(res == 0); } - written = buf.st_size; + total_bytes_written = buf.st_size; total_bytes_available = fallocate_ok ? buf.st_blocks * 512 : max_log_file_size; // always leave space for last termination block @@ -341,29 +358,27 @@ private: using MemoryBuffer = PODArray; using WriteBufferMemory = WriteBufferFromVector; MemoryBuffer memory; + // we write everything to memory buffer to see how many bytes are required + // when we have that information we can try to write to the disk std::unique_ptr memory_buf; ChangelogFileDescriptionPtr current_file_description{nullptr}; std::unique_ptr file_buf; - size_t written{0}; + std::optional last_index_written; + size_t total_bytes_written{0}; size_t total_bytes_available{0}; std::unique_ptr compressed_buffer; bool prealloc_done{false}; - bool compress_logs; - - bool force_fsync; - - uint64_t rotate_interval; - - uint64_t max_log_file_size; + // Changelog configuration + const bool compress_logs; + const bool force_fsync; + const uint64_t rotate_interval; + const uint64_t max_log_file_size; const std::filesystem::path changelogs_dir; - - uint64_t start_index; - std::optional last_index; }; struct ChangelogReadResult @@ -1068,8 +1083,11 @@ void Changelog::shutdown() if (write_thread.joinable()) write_thread.join(); - current_writer->finalize(); - current_writer.reset(); + if (current_writer) + { + current_writer->finalize(); + current_writer.reset(); + } } Changelog::~Changelog() diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 4e2a6a184d2..88d758c6867 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -2271,6 +2271,57 @@ TEST_P(CoordinationTest, TestSystemNodeModify) assert_create("/keeper1/test", Error::ZOK); } +TEST_P(CoordinationTest, ChangelogTestMaxLogSize) +{ + auto params = GetParam(); + ChangelogDirTest test("./logs"); + + uint64_t last_entry_index{0}; + size_t i{0}; + { + SCOPED_TRACE("Small rotation interval, big size limit"); + DB::KeeperLogStore changelog("./logs", 20, true, params.enable_compression, 50 * 1024 * 1024); + changelog.init(1, 0); + + for (; i < 100; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10); + last_entry_index = changelog.append(entry); + } + changelog.end_of_append_batch(0, 0); + + waitDurableLogs(changelog); + + ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); + } + { + SCOPED_TRACE("Large rotation interval, small size limit"); + DB::KeeperLogStore changelog("./logs", 100'000, true, params.enable_compression, 4000); + changelog.init(1, 0); + + ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); + + for (; i < 500; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10); + last_entry_index = changelog.append(entry); + } + changelog.end_of_append_batch(0, 0); + + waitDurableLogs(changelog); + + ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); + } + { + SCOPED_TRACE("Final verify all logs"); + DB::KeeperLogStore changelog("./logs", 100'000, true, params.enable_compression, 4000); + changelog.init(1, 0); + ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); + } + +} + + INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, CoordinationTest, ::testing::ValuesIn(std::initializer_list{ diff --git a/src/IO/WriteBufferFromVector.h b/src/IO/WriteBufferFromVector.h index 521acb6c8d6..da9947ee2f9 100644 --- a/src/IO/WriteBufferFromVector.h +++ b/src/IO/WriteBufferFromVector.h @@ -58,6 +58,12 @@ public: finalized = false; } + size_t valuesWritten() { + return ((position() - reinterpret_cast(vector.data())) /// NOLINT + + sizeof(typename VectorType::value_type) - 1) /// Align up. + / sizeof(typename VectorType::value_type); + } + ~WriteBufferFromVector() override { finalize(); @@ -66,10 +72,7 @@ public: private: void finalizeImpl() override { - vector.resize( - ((position() - reinterpret_cast(vector.data())) /// NOLINT - + sizeof(typename VectorType::value_type) - 1) /// Align up. - / sizeof(typename VectorType::value_type)); + vector.resize(valuesWritten()); /// Prevent further writes. set(nullptr, 0);