mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 21:20:49 +00:00
Slightly more optimal
This commit is contained in:
parent
af95db2fcf
commit
acf843a01a
@ -39,21 +39,15 @@ namespace
|
||||
|
||||
static constexpr auto DEFAULT_PREFIX = "changelog";
|
||||
|
||||
struct ChangelogName
|
||||
{
|
||||
std::string prefix;
|
||||
size_t from_log_idx;
|
||||
size_t to_log_idx;
|
||||
};
|
||||
|
||||
std::string formatChangelogPath(const std::string & prefix, const ChangelogName & name)
|
||||
std::string formatChangelogPath(const std::string & prefix, const ChangelogFileDescription & name)
|
||||
{
|
||||
std::filesystem::path path(prefix);
|
||||
path /= std::filesystem::path(name.prefix + "_" + std::to_string(name.from_log_idx) + "_" + std::to_string(name.to_log_idx) + ".bin");
|
||||
return path;
|
||||
}
|
||||
|
||||
ChangelogName getChangelogName(const std::string & path_str)
|
||||
|
||||
ChangelogFileDescription getChangelogFileDescription(const std::string & path_str)
|
||||
{
|
||||
std::filesystem::path path(path_str);
|
||||
std::string filename = path.stem();
|
||||
@ -62,10 +56,11 @@ ChangelogName getChangelogName(const std::string & path_str)
|
||||
if (filename_parts.size() < 3)
|
||||
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path_str);
|
||||
|
||||
ChangelogName result;
|
||||
ChangelogFileDescription result;
|
||||
result.prefix = filename_parts[0];
|
||||
result.from_log_idx = parse<size_t>(filename_parts[1]);
|
||||
result.to_log_idx = parse<size_t>(filename_parts[2]);
|
||||
result.path = path_str;
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -204,8 +199,8 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval
|
||||
|
||||
for (const auto & p : fs::directory_iterator(changelogs_dir))
|
||||
{
|
||||
auto name = getChangelogName(p.path());
|
||||
existing_changelogs[name.from_log_idx] = p.path();
|
||||
auto file_description = getChangelogFileDescription(p.path());
|
||||
existing_changelogs[file_description.from_log_idx] = file_description;
|
||||
}
|
||||
}
|
||||
|
||||
@ -214,22 +209,40 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx)
|
||||
size_t read_from_last = 0;
|
||||
start_index = from_log_idx == 0 ? 1 : from_log_idx;
|
||||
size_t total_read = 0;
|
||||
for (const auto & [start_id, changelog_file] : existing_changelogs)
|
||||
size_t entries_in_last = 0;
|
||||
size_t incomplete_log_idx = 0;
|
||||
for (const auto & [start_idx, changelog_description] : existing_changelogs)
|
||||
{
|
||||
ChangelogName parsed_name = getChangelogName(changelog_file);
|
||||
if (parsed_name.to_log_idx >= from_log_idx)
|
||||
entries_in_last = changelog_description.to_log_idx - changelog_description.from_log_idx + 1;
|
||||
|
||||
if (changelog_description.to_log_idx >= from_log_idx)
|
||||
{
|
||||
ChangelogReader reader(changelog_file);
|
||||
ChangelogReader reader(changelog_description.path);
|
||||
read_from_last = reader.readChangelog(logs, from_log_idx, index_to_start_pos);
|
||||
total_read += read_from_last;
|
||||
|
||||
/// May happen after truncate and crash
|
||||
if (read_from_last < entries_in_last)
|
||||
{
|
||||
incomplete_log_idx = start_idx;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (existing_changelogs.size() > 0 && read_from_last < rotate_interval)
|
||||
if (incomplete_log_idx != 0)
|
||||
{
|
||||
auto str_name = existing_changelogs.rbegin()->second;
|
||||
auto parsed_name = getChangelogName(str_name);
|
||||
current_writer = std::make_unique<ChangelogWriter>(str_name, WriteMode::Append, parsed_name.from_log_idx);
|
||||
for (auto itr = existing_changelogs.upper_bound(incomplete_log_idx); itr != existing_changelogs.end();)
|
||||
{
|
||||
std::filesystem::remove(itr->second.path);
|
||||
itr = existing_changelogs.erase(itr);
|
||||
}
|
||||
}
|
||||
|
||||
if (existing_changelogs.size() > 0 && read_from_last < entries_in_last)
|
||||
{
|
||||
auto description = existing_changelogs.rbegin()->second;
|
||||
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_idx);
|
||||
current_writer->setEntriesWritten(read_from_last);
|
||||
}
|
||||
else
|
||||
@ -243,14 +256,14 @@ void Changelog::rotate(size_t new_start_log_idx)
|
||||
if (current_writer)
|
||||
current_writer->flush();
|
||||
|
||||
ChangelogName new_name;
|
||||
new_name.prefix = DEFAULT_PREFIX;
|
||||
new_name.from_log_idx = new_start_log_idx;
|
||||
new_name.to_log_idx = new_start_log_idx + rotate_interval - 1;
|
||||
ChangelogFileDescription new_description;
|
||||
new_description.prefix = DEFAULT_PREFIX;
|
||||
new_description.from_log_idx = new_start_log_idx;
|
||||
new_description.to_log_idx = new_start_log_idx + rotate_interval - 1;
|
||||
|
||||
auto new_log_path = formatChangelogPath(changelogs_dir, new_name);
|
||||
existing_changelogs[new_start_log_idx] = new_log_path;
|
||||
current_writer = std::make_unique<ChangelogWriter>(new_log_path, WriteMode::Rewrite, new_start_log_idx);
|
||||
new_description.path = formatChangelogPath(changelogs_dir, new_description);
|
||||
existing_changelogs[new_start_log_idx] = new_description;
|
||||
current_writer = std::make_unique<ChangelogWriter>(new_description.path, WriteMode::Rewrite, new_start_log_idx);
|
||||
}
|
||||
|
||||
ChangelogRecord Changelog::buildRecord(size_t index, nuraft::ptr<nuraft::log_entry> log_entry) const
|
||||
@ -301,15 +314,14 @@ void Changelog::writeAt(size_t index, nuraft::ptr<nuraft::log_entry> log_entry)
|
||||
if (need_rollback)
|
||||
{
|
||||
auto index_changelog = existing_changelogs.lower_bound(index);
|
||||
std::string fname;
|
||||
ChangelogFileDescription description;
|
||||
if (index_changelog->first == index)
|
||||
fname = index_changelog->second;
|
||||
description = index_changelog->second;
|
||||
else
|
||||
fname = std::prev(index_changelog)->second;
|
||||
description = std::prev(index_changelog)->second;
|
||||
|
||||
current_writer = std::make_unique<ChangelogWriter>(fname, WriteMode::Append, index_changelog->first);
|
||||
auto formated_name = getChangelogName(fname);
|
||||
current_writer->setEntriesWritten(formated_name.to_log_idx - formated_name.from_log_idx + 1);
|
||||
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first);
|
||||
current_writer->setEntriesWritten(description.to_log_idx - description.from_log_idx + 1);
|
||||
}
|
||||
|
||||
auto entries_written = current_writer->getEntriesWritten();
|
||||
@ -320,7 +332,7 @@ void Changelog::writeAt(size_t index, nuraft::ptr<nuraft::log_entry> log_entry)
|
||||
auto to_remove_itr = existing_changelogs.upper_bound(index);
|
||||
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
|
||||
{
|
||||
std::filesystem::remove(itr->second);
|
||||
std::filesystem::remove(itr->second.path);
|
||||
itr = existing_changelogs.erase(itr);
|
||||
}
|
||||
}
|
||||
@ -342,17 +354,16 @@ void Changelog::compact(size_t up_to_log_idx)
|
||||
{
|
||||
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
|
||||
{
|
||||
ChangelogName parsed_name = getChangelogName(itr->second);
|
||||
if (parsed_name.to_log_idx <= up_to_log_idx)
|
||||
if (itr->second.to_log_idx <= up_to_log_idx)
|
||||
{
|
||||
for (size_t idx = parsed_name.from_log_idx; idx <= parsed_name.to_log_idx; ++idx)
|
||||
for (size_t idx = itr->second.from_log_idx; idx <= itr->second.to_log_idx; ++idx)
|
||||
{
|
||||
auto index_pos = index_to_start_pos.find(idx);
|
||||
if (index_pos == index_to_start_pos.end())
|
||||
break;
|
||||
index_to_start_pos.erase(index_pos);
|
||||
}
|
||||
std::filesystem::remove(itr->second);
|
||||
std::filesystem::remove(itr->second.path);
|
||||
itr = existing_changelogs.erase(itr);
|
||||
}
|
||||
else
|
||||
@ -366,7 +377,6 @@ void Changelog::compact(size_t up_to_log_idx)
|
||||
|
||||
LogEntryPtr Changelog::getLastEntry() const
|
||||
{
|
||||
|
||||
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(size_t)));
|
||||
|
||||
size_t next_idx = getNextEntryIndex() - 1;
|
||||
|
@ -45,6 +45,15 @@ struct ChangelogRecord
|
||||
nuraft::ptr<nuraft::buffer> blob;
|
||||
};
|
||||
|
||||
struct ChangelogFileDescription
|
||||
{
|
||||
std::string prefix;
|
||||
size_t from_log_idx;
|
||||
size_t to_log_idx;
|
||||
|
||||
std::string path;
|
||||
};
|
||||
|
||||
class ChangelogWriter;
|
||||
|
||||
class Changelog
|
||||
@ -98,7 +107,7 @@ private:
|
||||
|
||||
private:
|
||||
std::string changelogs_dir;
|
||||
std::map<size_t, std::string> existing_changelogs;
|
||||
std::map<size_t, ChangelogFileDescription> existing_changelogs;
|
||||
std::unique_ptr<ChangelogWriter> current_writer;
|
||||
IndexToOffset index_to_start_pos;
|
||||
const size_t rotate_interval;
|
||||
|
@ -36,7 +36,9 @@ struct ChangelogDirTest
|
||||
, drop(drop_)
|
||||
{
|
||||
if (fs::exists(path))
|
||||
{
|
||||
EXPECT_TRUE(false) << "Path " << path << " already exists, remove it to run test";
|
||||
}
|
||||
fs::create_directory(path);
|
||||
}
|
||||
|
||||
@ -810,4 +812,59 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin"));
|
||||
}
|
||||
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 35; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.append(entry);
|
||||
}
|
||||
EXPECT_EQ(changelog.size(), 35);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
|
||||
DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
|
||||
plain_buf.truncate(0);
|
||||
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 5);
|
||||
changelog_reader.init(1);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), 10);
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
|
||||
auto entry = getLogEntry("h", 7777);
|
||||
changelog_reader.append(entry);
|
||||
EXPECT_EQ(changelog_reader.size(), 11);
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user