Review fixes

This commit is contained in:
alesapin 2021-09-02 12:20:11 +03:00
parent 9536da9319
commit a8003e444b
4 changed files with 115 additions and 78 deletions

View File

@ -142,12 +142,22 @@ private:
struct ChangelogReadResult
{
/// Total entries read from log including skipped
uint64_t entries_read;
/// Total entries read from log including skipped.
/// Useful when we decide to continue to write in the same log and want to know
/// how many entries was already written in it.
uint64_t total_entries_read_from_log;
/// First index in log
uint64_t log_start_index;
/// First entry actually read log (not including skipped)
uint64_t first_read_index;
/// Last entry read from log
/// Last entry read from log (last entry in log)
/// When we don't skip anything last_read_index - first_read_index = total_entries_read_from_log.
/// But when some entries from the start of log can be skipped because they are not required.
uint64_t last_read_index;
/// last offset we were able to read from log
off_t last_position;
bool error;
};
@ -160,6 +170,7 @@ public:
, read_buf(filepath)
{}
/// start_log_index -- all entries with index < start_log_index will be skipped, but accounted into total_entries_read_from_log
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
{
uint64_t previous_index = 0;
@ -214,7 +225,7 @@ public:
if (logs.count(record.header.index) != 0)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplicated index id {} in log {}", record.header.index, filepath);
result.entries_read += 1;
result.total_entries_read_from_log += 1;
/// Read but skip this entry because our state is already more fresh
if (record.header.index < start_log_index)
@ -230,8 +241,8 @@ public:
index_to_offset[record.header.index] = result.last_position;
result.last_read_index = record.header.index;
if (result.entries_read % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.entries_read);
if (result.total_entries_read_from_log % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.total_entries_read_from_log);
}
}
catch (const Exception & ex)
@ -248,7 +259,7 @@ public:
tryLogCurrentException(log);
}
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.entries_read);
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.total_entries_read_from_log);
return result;
}
@ -285,13 +296,10 @@ Changelog::Changelog(
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
uint64_t total_read = 0;
std::optional<ChangelogReadResult> last_log_read_result;
/// Log idx of the first incomplete log (key in existing_changelogs)
/// There must be no other logs after incomplete one.
int64_t first_incomplete_log_start_index = -1; /// if -1 then no incomplete log exists
ChangelogReadResult last_log_read_result{};
/// Last log has some free space to write
bool last_log_is_not_complete = false;
/// We must start to read from this log index
uint64_t start_to_read_from = last_commited_log_index;
@ -302,19 +310,14 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_to_read_from = 1;
/// At least we read something
bool started = false;
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
/// How many entries we have in the last changelog
uint64_t expected_entries_in_log = changelog_description.expectedEntriesCountInLog();
/// [from_log_index.>=.......start_to_read_from.....<=.to_log_index]
if (changelog_description.to_log_index >= start_to_read_from)
{
if (!started) /// still nothing was read
if (!last_log_read_result) /// still nothing was read
{
/// Our first log starts from the more fresh log_id than we required to read and this changelog is not empty log.
/// So we are missing something in our logs, but it's not dataloss, we will receive snapshot and required
@ -322,8 +325,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1)
{
LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
first_incomplete_log_start_index = changelog_start_index;
break;
/// Nothing to do with our more fresh log, leader will overwrite them, so remove everything and just start from last_commited_index
removeAllLogs();
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
rotate(max_log_id + 1);
return;
}
else if (changelog_description.from_log_index > start_to_read_from)
{
@ -336,71 +343,98 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
ChangelogReader reader(changelog_description.path);
last_log_read_result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log);
started = true;
/// Otherwise we have already initialized it
if (min_log_id == 0)
min_log_id = last_log_read_result.first_read_index;
min_log_id = last_log_read_result->first_read_index;
total_read += last_log_read_result.entries_read;
if (last_log_read_result->last_read_index != 0)
max_log_id = last_log_read_result->last_read_index;
if (last_log_read_result.last_read_index != 0)
max_log_id = last_log_read_result.last_read_index;
last_log_read_result->log_start_index = changelog_description.from_log_index;
/// How many entries we have in the last changelog
uint64_t expected_entries_in_log = changelog_description.expectedEntriesCountInLog();
/// May happen after truncate, crash or simply unfinished log
if (last_log_read_result.entries_read < expected_entries_in_log)
if (last_log_read_result->total_entries_read_from_log < expected_entries_in_log)
{
first_incomplete_log_start_index = changelog_start_index;
last_log_is_not_complete = true;
break;
}
}
}
if (min_log_id == 0) /// We just may have no logs (only snapshot or nothing)
/// we can have empty log (with zero entries) and last_log_read_result will be initialized
if (!last_log_read_result || min_log_id == 0) /// We just may have no logs (only snapshot or nothing)
{
/// Just to be sure they don't exist
removeAllLogs();
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
}
/// Found some broken or non finished logs
/// We have to remove broken data and continue to write into incomplete log.
if (first_incomplete_log_start_index != -1) /// otherwise all logs completed so just start a new one
else if (last_log_is_not_complete) /// if it's complete just start new one
{
auto start_remove_from = existing_changelogs.begin();
if (started)
start_remove_from = existing_changelogs.upper_bound(first_incomplete_log_start_index);
assert(last_log_read_result != std::nullopt);
/// Actually they shouldn't exist, but to be sure we remove them
removeAllLogsAfter(last_log_read_result->log_start_index);
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_remove_from; itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
assert(!existing_changelogs.empty());
assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
/// Continue to write into incomplete existing log
auto description = existing_changelogs[last_log_read_result->log_start_index];
if (last_log_read_result->error)
initWriter(description, last_log_read_result->total_entries_read_from_log, /* truncate_after = */ last_log_read_result->last_position);
else
initWriter(description, last_log_read_result->total_entries_read_from_log);
}
/// Continue to write into existing log
if (!existing_changelogs.empty())
{
auto description = existing_changelogs.rbegin()->second;
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer)
rotate(max_log_id + 1);
}
void Changelog::initWriter(const ChangelogFileDescription & description, uint64_t entries_already_written, std::optional<uint64_t> truncate_to_offset)
{
if (description.expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description.expectedEntriesCountInLog());
LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
current_writer->setEntriesWritten(last_log_read_result.entries_read);
current_writer->setEntriesWritten(entries_already_written);
/// Truncate all broken entries from log
if (last_log_read_result.error)
if (truncate_to_offset)
{
LOG_WARNING(log, "Read finished with error, truncating all broken log entries");
current_writer->truncateToLength(last_log_read_result.last_position);
}
}
LOG_WARNING(log, "Changelog {} contain broken enties, truncating all broken log entries", description.path);
current_writer->truncateToLength(*truncate_to_offset);
}
}
/// Start new log if we don't initialize writer from previous log
if (!current_writer)
rotate(max_log_id + 1);
void Changelog::removeAllLogsAfter(uint64_t start_to_remove_from_id)
{
auto start_to_remove_from = existing_changelogs.upper_bound(start_to_remove_from_id);
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_to_remove_from; itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
}
void Changelog::removeAllLogs()
{
LOG_WARNING(log, "Removing all changelogs");
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
}
void Changelog::rotate(uint64_t new_start_log_index)

View File

@ -2,6 +2,7 @@
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <city.h>
#include <optional>
#include <IO/WriteBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -128,6 +129,11 @@ private:
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(uint64_t new_start_log_index);
void removeAllLogsAfter(uint64_t start_to_remove_from_id);
void removeAllLogs();
void initWriter(const ChangelogFileDescription & description, uint64_t entries_already_written, std::optional<uint64_t> truncate_to_offset = {});
private:
const std::string changelogs_dir;
const uint64_t rotate_interval;

View File

@ -312,12 +312,10 @@ void KeeperDispatcher::shutdown()
if (server)
server->shutdown();
if (requests_queue)
{
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
while (requests_queue->tryPop(request_for_session))
while (requests_queue && requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
{
@ -330,7 +328,6 @@ void KeeperDispatcher::shutdown()
break;
}
}
}
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);

View File

@ -192,7 +192,7 @@ struct SocketInterruptablePollWrapper
KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, log(&Poco::Logger::get("KeeperTCPHandler"))
, global_context(Context::createCopy(server.context()))
, keeper_dispatcher(global_context->getKeeperDispatcher())
, operation_timeout(0, global_context->getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)