make it better

This commit is contained in:
Antonio Andelic 2023-01-09 12:01:22 +00:00
parent a4f35d5da9
commit 27b2bae640

View File

@ -321,8 +321,6 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_to_read_from = 1;
SCOPE_EXIT({ initialized = true; });
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
@ -343,6 +341,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
rotate(max_log_id + 1, writer_lock);
initialized = true;
return;
}
else if (changelog_description.from_log_index > start_to_read_from)
@ -434,6 +433,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer)
rotate(max_log_id + 1, writer_lock);
initialized = true;
}
@ -611,7 +612,11 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before writting records");
{
std::lock_guard lock(writer_mutex);
/// This write_at require to overwrite everything in this file and also in previous file(s)
const bool go_to_previous_file = index < current_writer->getStartIndex();
@ -650,6 +655,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
void Changelog::compact(uint64_t up_to_log_index)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before compacting records");
std::lock_guard lock(writer_mutex);
LOG_INFO(log, "Compact logs up to log index {}, our max log id is {}", up_to_log_index, max_log_id);
@ -810,6 +818,9 @@ void Changelog::flush()
bool Changelog::flushAsync()
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before flushing records");
bool pushed = write_operations.push(Flush{max_log_id});
if (!pushed)
LOG_WARNING(log, "Changelog is shut down");