Fix tests

This commit is contained in:
Antonio Andelic 2022-12-21 08:43:55 +00:00
parent 6d83221d4e
commit 9711355cc9
6 changed files with 158 additions and 108 deletions

View File

@ -36,20 +36,27 @@ std::string formatChangelogPath(
return path;
}
ChangelogFileDescription getChangelogFileDescription(const std::filesystem::path & path)
ChangelogFileDescriptionPtr getChangelogFileDescription(const std::filesystem::path & path)
{
std::string filename = path.stem();
// we can have .bin.zstd so we cannot use std::filesystem stem and extension
std::string filename_with_extension = path.filename();
std::string_view filename_with_extension_view = filename_with_extension;
auto first_dot = filename_with_extension.find('.');
if (first_dot == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid changelog file {}", path.generic_string());
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
boost::split(filename_parts, filename_with_extension_view.substr(0, first_dot), boost::is_any_of("_"));
if (filename_parts.size() < 3)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path.generic_string());
ChangelogFileDescription result;
result.prefix = filename_parts[0];
result.from_log_index = parse<uint64_t>(filename_parts[1]);
result.to_log_index = parse<uint64_t>(filename_parts[2]);
result.extension = path.extension();
result.path = path.generic_string();
auto result = std::make_shared<ChangelogFileDescription>();
result->prefix = filename_parts[0];
result->from_log_index = parse<uint64_t>(filename_parts[1]);
result->to_log_index = parse<uint64_t>(filename_parts[2]);
result->extension = std::string(filename_with_extension.substr(first_dot + 1));
result->path = path.generic_string();
return result;
}
@ -72,7 +79,7 @@ class ChangelogWriter
{
public:
ChangelogWriter(
std::map<uint64_t, ChangelogFileDescription> & existing_changelogs_,
std::map<uint64_t, ChangelogFileDescriptionPtr> & existing_changelogs_,
bool compress_logs_,
bool force_fsync_,
const std::filesystem::path & changelogs_dir_,
@ -96,15 +103,36 @@ public:
}
}
void setFile(ChangelogFileDescription & file_description, WriteMode mode)
void setFile(ChangelogFileDescriptionPtr file_description, WriteMode mode)
{
file_buf = std::make_unique<WriteBufferFromFile>(
file_description.path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY));
start_index = file_description.from_log_index;
last_index = start_index;
current_file_description = &file_description;
if (mode == WriteMode::Append && file_description->expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(&Poco::Logger::get("Changelog"), "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, file_description->expectedEntriesCountInLog());
if (compress_logs && mode == WriteMode::Append && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(file_description.path))
if (file_buf)
{
finalizeCurrentFile(/* final */ false);
assert(current_file_description);
if (last_index != current_file_description->to_log_index)
{
auto new_path = formatChangelogPath(
changelogs_dir,
current_file_description->prefix,
current_file_description->from_log_index,
last_index,
current_file_description->extension);
std::filesystem::rename(current_file_description->path, new_path);
current_file_description->path = std::move(new_path);
}
}
file_buf = std::make_unique<WriteBufferFromFile>(
file_description->path, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY));
start_index = file_description->from_log_index;
last_index = start_index;
current_file_description = std::move(file_description);
if (compress_logs && mode == WriteMode::Append && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path))
{
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
file_buf->sync();
@ -118,7 +146,7 @@ public:
bool appendRecord(ChangelogRecord && record)
{
const bool log_is_complete = record.header.index - getStartIndex() == rotate_interval;
const bool log_is_complete = record.header.index - getStartIndex() == current_file_description->expectedEntriesCountInLog();
if (log_is_complete)
rotate(record.header.index);
@ -146,7 +174,7 @@ public:
if (compressed_buffer)
{
/// Flush compressed data to WriteBufferFromFile working_buffer
/// Flush compressed data to WriteBufferMemory working_buffer
compressed_buffer->next();
}
@ -156,7 +184,7 @@ public:
cur_memory_buf.finalize();
// if the file is too big, rotate
if (total_bytes_available != 0 && written + memory.size() > total_bytes_available)
if (max_log_file_size != 0 && written + memory.size() > total_bytes_available)
{
rotate(record.header.index);
@ -185,47 +213,24 @@ public:
void rotate(uint64_t new_start_log_index)
{
if (file_buf)
{
if (compress_logs && ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock(current_file_description->path))
{
ZstdDeflatingAppendableWriteBuffer::addEmptyBlock(*file_buf);
file_buf->sync();
written += ZstdDeflatingAppendableWriteBuffer::ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
}
flush();
ftruncate(file_buf->getFD(), written);
auto new_path = formatChangelogPath(
changelogs_dir,
current_file_description->prefix,
current_file_description->from_log_index,
last_index,
current_file_description->extension);
std::filesystem::rename(current_file_description->path, new_path);
current_file_description->path = std::move(new_path);
}
/// Start new one
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index;
new_description.to_log_index = new_start_log_index + rotate_interval - 1;
new_description.extension = "bin";
auto new_description = std::make_shared<ChangelogFileDescription>();
new_description->prefix = DEFAULT_PREFIX;
new_description->from_log_index = new_start_log_index;
new_description->to_log_index = new_start_log_index + rotate_interval - 1;
new_description->extension = "bin";
if (compress_logs)
new_description.extension += "." + toContentEncodingName(CompressionMethod::Zstd);
new_description->extension += "." + toContentEncodingName(CompressionMethod::Zstd);
new_description.path = formatChangelogPath(
new_description->path = formatChangelogPath(
changelogs_dir,
new_description.prefix,
new_description->prefix,
new_start_log_index,
new_start_log_index + rotate_interval - 1,
new_description.extension);
new_description->extension);
LOG_TRACE(&Poco::Logger::get("Changelog"), "Starting new changelog {}", new_description.path);
LOG_TRACE(&Poco::Logger::get("Changelog"), "Starting new changelog {}", new_description->path);
auto [it, inserted] = existing_changelogs.insert(std::make_pair(new_start_log_index, std::move(new_description)));
setFile(it->second, WriteMode::Rewrite);
@ -234,10 +239,46 @@ public:
void finalize()
{
if (file_buf)
ftruncate(file_buf->getFD(), written);
finalizeCurrentFile(/* final */ true);
}
private:
void finalizeCurrentFile(bool final)
{
assert(file_buf);
// reset compression buffer
if (compress_logs)
{
compressed_buffer->finalize();
WriteBufferMemory & cur_memory_buf
= dynamic_cast<WriteBufferMemory &>(*compressed_buffer->getNestedBuffer());
cur_memory_buf.finalize();
file_buf->write(memory.raw_data(), memory.size());
flush();
assert(max_log_file_size == 0 || memory.size() < total_bytes_available - written);
written += memory.size();
if (final)
compressed_buffer.reset();
else
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::make_unique<WriteBufferMemory>(memory), /* compression level = */ 3);
}
else
{
flush();
}
if (max_log_file_size != 0)
ftruncate(file_buf->getFD(), written);
file_buf.reset();
}
WriteBuffer & getBuffer()
{
if (compressed_buffer)
@ -250,6 +291,7 @@ private:
if (max_log_file_size == 0)
{
total_bytes_available = 0;
written = 0;
prealloc_done = true;
return;
}
@ -277,14 +319,14 @@ private:
prealloc_done = true;
}
std::map<uint64_t, ChangelogFileDescription> & existing_changelogs;
std::map<uint64_t, ChangelogFileDescriptionPtr> & existing_changelogs;
using MemoryBuffer = PODArray<UInt8>;
using WriteBufferMemory = WriteBufferFromVector<MemoryBuffer>;
MemoryBuffer memory;
std::unique_ptr<WriteBufferMemory> memory_buf;
ChangelogFileDescription * current_file_description{nullptr};
ChangelogFileDescriptionPtr current_file_description{nullptr};
std::unique_ptr<WriteBufferFromFile> file_buf;
size_t written{0};
size_t total_bytes_available{0};
@ -459,7 +501,7 @@ Changelog::Changelog(
continue;
auto file_description = getChangelogFileDescription(p.path());
existing_changelogs[file_description.from_log_index] = file_description;
existing_changelogs[file_description->from_log_index] = std::move(file_description);
}
if (existing_changelogs.empty())
@ -490,9 +532,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_to_read_from = 1;
SCOPE_EXIT({ initialized = true; });
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
for (const auto & [changelog_start_index, changelog_description_ptr] : existing_changelogs)
{
const auto & changelog_description = *changelog_description_ptr;
/// [from_log_index.>=.......start_to_read_from.....<=.to_log_index]
if (changelog_description.to_log_index >= start_to_read_from)
{
@ -602,12 +647,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
/// Continue to write into incomplete existing log if it doesn't finished with error
auto & description = existing_changelogs[last_log_read_result->log_start_index];
const auto & description = existing_changelogs[last_log_read_result->log_start_index];
if (last_log_read_result->last_read_index == 0 || last_log_read_result->error) /// If it's broken log then remove it
{
LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description.path);
std::filesystem::remove(description.path);
LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description->path);
std::filesystem::remove(description->path);
existing_changelogs.erase(last_log_read_result->log_start_index);
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; });
}
@ -620,32 +665,30 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer->isFileSet())
current_writer->rotate(max_log_id + 1);
initialized = true;
}
void Changelog::initWriter(ChangelogFileDescription & description)
void Changelog::initWriter(ChangelogFileDescriptionPtr description)
{
LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer->setFile(description, WriteMode::Append);
LOG_TRACE(log, "Continue to write into {}", description->path);
current_writer->setFile(std::move(description), WriteMode::Append);
}
namespace
{
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
}
@ -661,8 +704,8 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
std::filesystem::create_directories(timestamp_folder);
}
LOG_WARNING(log, "Removing changelog {}", itr->second.path);
const std::filesystem::path path = itr->second.path;
LOG_WARNING(log, "Removing changelog {}", itr->second->path);
const std::filesystem::path & path = itr->second->path;
const auto new_path = timestamp_folder / path.filename();
std::filesystem::rename(path, new_path);
itr = existing_changelogs.erase(itr);
@ -788,21 +831,20 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
{
auto index_changelog = existing_changelogs.lower_bound(index);
ChangelogFileDescription * description;
ChangelogFileDescriptionPtr description{nullptr};
if (index_changelog->first == index) /// exactly this file starts from index
description = &index_changelog->second;
description = index_changelog->second;
else
description = &std::prev(index_changelog)->second;
description = std::prev(index_changelog)->second;
current_writer->flush();
current_writer->setFile(*description, WriteMode::Append);
current_writer->setFile(std::move(description), WriteMode::Append);
/// Remove all subsequent files if overwritten something in previous one
auto to_remove_itr = existing_changelogs.upper_bound(index);
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
{
std::filesystem::remove(itr->second.path);
std::filesystem::remove(itr->second->path);
itr = existing_changelogs.erase(itr);
}
}
@ -833,29 +875,29 @@ void Changelog::compact(uint64_t up_to_log_index)
bool need_rotate = false;
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
const auto & changelog_description = *itr->second;
/// Remove all completely outdated changelog files
if (remove_all_logs || itr->second.to_log_index <= up_to_log_index)
if (remove_all_logs || changelog_description.to_log_index <= up_to_log_index)
{
if (current_writer && itr->second.from_log_index == current_writer->getStartIndex())
if (current_writer && changelog_description.from_log_index == current_writer->getStartIndex())
{
LOG_INFO(
log,
"Trying to remove log {} which is current active log for write. Possibly this node recovers from snapshot",
itr->second.path);
changelog_description.path);
need_rotate = true;
current_writer.reset();
}
LOG_INFO(log, "Removing changelog {} because of compaction", itr->second.path);
LOG_INFO(log, "Removing changelog {} because of compaction", changelog_description.path);
/// If failed to push to queue for background removing, then we will remove it now
if (!log_files_to_delete_queue.tryPush(itr->second.path, 1))
if (!log_files_to_delete_queue.tryPush(changelog_description.path, 1))
{
std::error_code ec;
std::filesystem::remove(itr->second.path, ec);
std::filesystem::remove(changelog_description.path, ec);
if (ec)
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", itr->second.path, ec.message());
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", changelog_description.path, ec.message());
else
LOG_INFO(log, "Removed changelog {} because of compaction", itr->second.path);
LOG_INFO(log, "Removed changelog {} because of compaction", changelog_description.path);
}
itr = existing_changelogs.erase(itr);
@ -1010,6 +1052,7 @@ void Changelog::shutdown()
write_thread.join();
current_writer->finalize();
current_writer.reset();
}
Changelog::~Changelog()

View File

@ -64,6 +64,8 @@ struct ChangelogFileDescription
uint64_t expectedEntriesCountInLog() const { return to_log_index - from_log_index + 1; }
};
using ChangelogFileDescriptionPtr = std::shared_ptr<ChangelogFileDescription>;
class ChangelogWriter;
/// Simplest changelog with files rotation.
@ -140,7 +142,7 @@ private:
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
/// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
std::map<uint64_t, ChangelogFileDescriptionPtr> existing_changelogs;
using ChangelogIter = decltype(existing_changelogs)::iterator;
void removeExistingLogs(ChangelogIter begin, ChangelogIter end);
@ -151,7 +153,7 @@ private:
/// Remove all logs from disk
void removeAllLogs();
/// Init writer for existing log with some entries already written
void initWriter(ChangelogFileDescription & description);
void initWriter(ChangelogFileDescriptionPtr description);
/// Clean useless log files in a background thread
void cleanLogThread();

View File

@ -235,6 +235,7 @@ TEST_P(CoordinationTest, ChangelogTestSimple)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
@ -405,6 +406,7 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
EXPECT_EQ(changelog_reader.size(), 11);
waitDurableLogs(changelog_reader);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
@ -874,7 +876,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(140);
plain_buf.truncate(30);
DB::KeeperLogStore changelog_reader("./logs", 20, true, params.enable_compression);
changelog_reader.init(1, 0);
@ -2161,6 +2163,7 @@ TEST_P(CoordinationTest, TestDurableState)
const auto reload_state_manager = [&]
{
state_manager.emplace(1, "localhost", 9181, "./logs", "./state");
state_manager->loadLogStore(1, 0);
};
reload_state_manager();

View File

@ -39,12 +39,6 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
input.size = offset();
input.pos = 0;
//if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
//{
// addEmptyBlock();
// first_write = false;
//}
try
{
bool ended = false;
@ -134,6 +128,16 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
out->position() = out->buffer().begin() + output.pos;
if (!out->hasPendingData())
{
out->next();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
}
}
out->position() = out->buffer().begin() + output.pos;
}

View File

@ -69,12 +69,10 @@ private:
std::unique_ptr<WriteBuffer> out;
//bool append_to_existing_file = false;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
/// Flipped on the first nextImpl call
bool first_write = true;
bool first_write;
};
}

View File

@ -58,7 +58,7 @@ bool ZstdInflatingReadBuffer::nextImpl()
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(ret))
throw Exception(
ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream encoding failed: error '{}'; zstd version: {}", ZSTD_getErrorName(ret), ZSTD_VERSION_STRING);
ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error '{}'; zstd version: {}", ZSTD_getErrorName(ret), ZSTD_VERSION_STRING);
/// Check that something has changed after decompress (input or output position)
assert(in->eof() || output.pos > 0 || in->position() < in->buffer().begin() + input.pos);