Merge pull request #45039 from ClickHouse/fix-coordination-unit-tests

Small fixes for Coordination unit tests
This commit is contained in:
Alexey Milovidov 2023-01-10 05:12:37 +03:00 committed by GitHub
commit 3b086d1990
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 32 deletions

View File

@ -341,6 +341,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
rotate(max_log_id + 1, writer_lock);
initialized = true;
return;
}
else if (changelog_description.from_log_index > start_to_read_from)
@ -611,7 +612,11 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before writing records");
{
std::lock_guard lock(writer_mutex);
/// This write_at require to overwrite everything in this file and also in previous file(s)
const bool go_to_previous_file = index < current_writer->getStartIndex();
@ -650,6 +655,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
void Changelog::compact(uint64_t up_to_log_index)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before compacting records");
std::lock_guard lock(writer_mutex);
LOG_INFO(log, "Compact logs up to log index {}, our max log id is {}", up_to_log_index, max_log_id);
@ -810,6 +818,9 @@ void Changelog::flush()
bool Changelog::flushAsync()
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before flushing records");
bool pushed = write_operations.push(Flush{max_log_id});
if (!pushed)
LOG_WARNING(log, "Changelog is shut down");
@ -846,17 +857,14 @@ Changelog::~Changelog()
void Changelog::cleanLogThread()
{
while (!log_files_to_delete_queue.isFinishedAndEmpty())
std::string path;
while (log_files_to_delete_queue.pop(path))
{
std::string path;
if (log_files_to_delete_queue.pop(path))
{
std::error_code ec;
if (std::filesystem::remove(path, ec))
LOG_INFO(log, "Removed changelog {} because of compaction.", path);
else
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", path, ec.message());
}
std::error_code ec;
if (std::filesystem::remove(path, ec))
LOG_INFO(log, "Removed changelog {} because of compaction.", path);
else
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", path, ec.message());
}
}

View File

@ -248,10 +248,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
if (!responses_queue.push(response_for_session) && !responses_queue.isFinished())
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", session_id);
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", session_id);
}
}
}
@ -261,13 +261,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(
request_for_session.request, request_for_session.session_id, request_for_session.zxid);
for (auto & response_for_session : responses_for_sessions)
if (!responses_queue.push(response_for_session) && !responses_queue.isFinished())
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
throw Exception(
ErrorCodes::SYSTEM_ERROR,
"Could not push response with session id {} into responses queue",
response_for_session.session_id);
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response_for_session.session_id);
}
if (keeper_context->digest_enabled && request_for_session.digest)
@ -523,9 +520,8 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
true /*check_acl*/,
true /*is_local*/);
for (const auto & response : responses)
if (!responses_queue.push(response) && !responses_queue.isFinished())
throw Exception(
ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id);
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
}
void KeeperStateMachine::shutdownStorage()

View File

@ -416,6 +416,24 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
EXPECT_EQ(logs_count, 3);
}
namespace
{
void assertFileDeleted(std::string path)
{
for (size_t i = 0; i < 100; ++i)
{
if (!fs::exists(path))
return;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
FAIL() << "File " << path << " was not removed";
}
}
TEST_P(CoordinationTest, ChangelogTestCompaction)
{
auto params = GetParam();
@ -440,6 +458,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog.start_index(), 3);
EXPECT_EQ(changelog.next_slot(), 4);
EXPECT_EQ(changelog.last_entry()->get_term(), 20);
// nothing should be deleted
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
auto e1 = getLogEntry("hello world", 30);
@ -460,7 +479,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
changelog.compact(6);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
EXPECT_FALSE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
assertFileDeleted("./logs/changelog_1_5.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_EQ(changelog.size(), 1);
@ -1418,13 +1437,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
}
if (snapshot_created)
{
if (changelog.size() > settings->reserved_log_items)
{
changelog.compact(i - settings->reserved_log_items);
}
}
if (snapshot_created && changelog.size() > settings->reserved_log_items)
changelog.compact(i - settings->reserved_log_items);
}
SnapshotsQueue snapshots_queue1{1};
@ -1620,7 +1634,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
changelog_2.compact(105);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
assertFileDeleted("./logs/changelog_1_100.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin" + params.extension));
@ -1641,9 +1655,9 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
changelog_3.compact(125);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_111_117.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_118_124.bin" + params.extension));
assertFileDeleted("./logs/changelog_101_110.bin" + params.extension);
assertFileDeleted("./logs/changelog_111_117.bin" + params.extension);
assertFileDeleted("./logs/changelog_118_124.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin" + params.extension));
@ -2161,6 +2175,7 @@ TEST_P(CoordinationTest, TestDurableState)
const auto reload_state_manager = [&]
{
state_manager.emplace(1, "localhost", 9181, "./logs", "./state");
state_manager->loadLogStore(1, 0);
};
reload_state_manager();