diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 3c2004a1b75..d8743d5bb5d 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -707,6 +707,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin else start_to_read_from = 1; + uint64_t last_read_index = 0; + /// Got through changelog files in order of start_index for (const auto & [changelog_start_index, changelog_description_ptr] : existing_changelogs) { @@ -747,27 +749,29 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin changelog_description.from_log_index); } } - else if ((changelog_description.from_log_index - last_log_read_result->last_read_index) > 1) + else if ((changelog_description.from_log_index - last_read_index) > 1) { - LOG_ERROR( - log, - "Some records were lost, last found log index {}, while the next log index on disk is {}. Hopefully will receive " - "missing records from leader.", - last_log_read_result->last_read_index, - changelog_description.from_log_index); - removeAllLogsAfter(last_log_read_result->log_start_index); + if (!last_log_read_result->error) + { + LOG_ERROR( + log, + "Some records were lost, last found log index {}, while the next log index on disk is {}. Hopefully will receive " + "missing records from leader.", + last_read_index, + changelog_description.from_log_index); + removeAllLogsAfter(last_log_read_result->log_start_index); + } break; } ChangelogReader reader(changelog_description.disk, changelog_description.path); last_log_read_result = reader.readChangelog(logs, start_to_read_from, log); + + if (last_log_read_result->last_read_index != 0) + last_read_index = last_log_read_result->last_read_index; + last_log_read_result->log_start_index = changelog_description.from_log_index; - if (last_log_read_result->error) - { - last_log_is_not_complete = true; - break; - } /// Otherwise we have already initialized it if (min_log_id == 0) min_log_id = last_log_read_result->first_read_index; @@ -779,14 +783,20 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin uint64_t log_count = changelog_description.expectedEntriesCountInLog(); /// Unfinished log - if (last_log_read_result->error || last_log_read_result->total_entries_read_from_log < log_count) - { - last_log_is_not_complete = true; - break; - } + last_log_is_not_complete = last_log_read_result->error || last_log_read_result->total_entries_read_from_log < log_count; } } + const auto move_from_latest_logs_disks = [&](auto & description) + { + /// check if we need to move completed log to another disk + auto latest_log_disk = getLatestLogDisk(); + auto disk = getDisk(); + + if (latest_log_disk != disk && latest_log_disk == description->disk) + moveFileBetweenDisks(latest_log_disk, description, disk, description->path); + }; + /// we can have empty log (with zero entries) and last_log_read_result will be initialized if (!last_log_read_result || min_log_id == 0) /// We just may have no logs (only snapshot or nothing) { @@ -813,23 +823,34 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin assert(last_log_read_result != std::nullopt); assert(!existing_changelogs.empty()); - /// Actually they shouldn't exist, but to be sure we remove them - removeAllLogsAfter(last_log_read_result->log_start_index); - - /// This log, even if it finished with error shouldn't be removed - assert(existing_changelogs.find(last_log_read_result->log_start_index) != existing_changelogs.end()); - assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first); - /// Continue to write into incomplete existing log if it didn't finish with error const auto & description = existing_changelogs[last_log_read_result->log_start_index]; - if (last_log_read_result->last_read_index == 0 || last_log_read_result->error) /// If it's broken log then remove it + const auto remove_invalid_logs = [&] { - LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description->path); + /// Actually they shouldn't exist, but to be sure we remove them + removeAllLogsAfter(last_log_read_result->log_start_index); + + /// This log, even if it finished with error shouldn't be removed + chassert(existing_changelogs.find(last_log_read_result->log_start_index) != existing_changelogs.end()); + chassert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first); + }; + + if (last_log_read_result->last_read_index == 0) /// If it's broken or empty log then remove it + { + LOG_INFO(log, "Removing chagelog {} because it's empty", description->path); + remove_invalid_logs(); description->disk->removeFile(description->path); existing_changelogs.erase(last_log_read_result->log_start_index); std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; }); } + else if (last_log_read_result->error) + { + LOG_INFO(log, "Chagelog {} read finished with error but some logs were read from it, file will not be removed", description->path); + remove_invalid_logs(); + std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first > last_log_read_result->last_read_index; }); + move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index)); + } else { initWriter(description); @@ -837,13 +858,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin } else if (last_log_read_result.has_value()) { - /// check if we need to move completed log to another disk - auto latest_log_disk = getLatestLogDisk(); - auto disk = getDisk(); - - auto & description = existing_changelogs.at(last_log_read_result->log_start_index); - if (latest_log_disk != disk && latest_log_disk == description->disk) - moveFileBetweenDisks(latest_log_disk, description, disk, description->path); + move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index)); } /// Start new log if we don't initialize writer from previous log. All logs can be "complete". @@ -927,17 +942,19 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end) for (auto itr = begin; itr != end;) { + auto & changelog_description = itr->second; + if (!disk->exists(timestamp_folder)) { LOG_WARNING(log, "Moving broken logs to {}", timestamp_folder); disk->createDirectories(timestamp_folder); } - LOG_WARNING(log, "Removing changelog {}", itr->second->path); - const std::filesystem::path & path = itr->second->path; + LOG_WARNING(log, "Removing changelog {}", changelog_description->path); + const std::filesystem::path & path = changelog_description->path; const auto new_path = timestamp_folder / path.filename(); - auto changelog_disk = itr->second->disk; + auto changelog_disk = changelog_description->disk; if (changelog_disk == disk) { try @@ -947,11 +964,11 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end) catch (const DB::Exception & e) { if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED) - moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path); + moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path); } } else - moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path); + moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path); itr = existing_changelogs.erase(itr); } diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index dddcb9aa218..a9e4d48fa36 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -167,9 +167,9 @@ private: std::map existing_changelogs; using ChangelogIter = decltype(existing_changelogs)::iterator; + void removeExistingLogs(ChangelogIter begin, ChangelogIter end); - static void removeLog(const std::filesystem::path & path, const std::filesystem::path & detached_folder); /// Remove all changelogs from disk with start_index bigger than start_to_remove_from_id void removeAllLogsAfter(uint64_t remove_after_log_start_index); /// Remove all logs from disk diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index ca454c18084..f69a9c11e97 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -367,9 +367,9 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf { LOG_DEBUG(log, "Initializing storage dispatcher"); + keeper_context = std::make_shared(standalone_keeper); configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper); - keeper_context = std::make_shared(standalone_keeper); keeper_context->initialize(config, this); requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_request_queue_size); @@ -452,7 +452,7 @@ void KeeperDispatcher::shutdown() try { { - if (keeper_context->shutdown_called.exchange(true)) + if (!keeper_context || keeper_context->shutdown_called.exchange(true)) return; LOG_DEBUG(log, "Shutting down storage dispatcher"); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 656d009e0a7..56c873bbbb9 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -4,7 +4,6 @@ #include "config.h" #include -#include #include #include #include @@ -617,6 +616,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ { const auto preprocess_logs = [&] { + keeper_context->local_logs_preprocessed = true; auto log_store = state_manager->load_log_store(); if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index()) { @@ -642,7 +642,6 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ { LOG_INFO(log, "All local log entries preprocessed"); } - keeper_context->local_logs_preprocessed = true; }; switch (type) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index b6185fa2e36..dcdd724f2bd 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -162,6 +161,15 @@ void assertDigest( nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) { + auto result = nuraft::buffer::alloc(sizeof(log_idx)); + nuraft::buffer_serializer ss(result); + ss.put_u64(log_idx); + + /// Don't preprocess anything until the first commit when we will manually pre_commit and commit + /// all needed logs + if (!keeper_context->local_logs_preprocessed) + return result; + auto request_for_session = parseRequest(data, /*final=*/false); if (!request_for_session->zxid) request_for_session->zxid = log_idx; @@ -169,9 +177,6 @@ nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nur request_for_session->log_idx = log_idx; preprocess(*request_for_session); - auto result = nuraft::buffer::alloc(sizeof(log_idx)); - nuraft::buffer_serializer ss(result); - ss.put_u64(log_idx); return result; } @@ -506,6 +511,10 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptrlocal_logs_preprocessed) + return; + auto request_for_session = parseRequest(data, true); // If we received a log from an older node, use the log_idx as the zxid // log_idx will always be larger or equal to the zxid so we can safely do this diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 418b5225fa4..62885066dbe 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1048,6 +1048,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); } +/// Truncating all entries TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) { auto params = GetParam(); @@ -1102,6 +1103,61 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); } +/// Truncating only some entries from the end +TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) +{ + auto params = GetParam(); + + /// For compressed logs we have no reliable way of knowing how many log entries were lost + /// after we truncate some bytes from the end + if (!params.extension.empty()) + return; + + ChangelogDirTest test("./logs"); + setLogDirectory("./logs"); + + DB::KeeperLogStore changelog( + DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::FlushSettings(), + keeper_context); + changelog.init(1, 0); + + for (size_t i = 0; i < 35; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10); + changelog.append(entry); + } + + changelog.end_of_append_batch(0, 0); + + waitDurableLogs(changelog); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension)); + + DB::WriteBufferFromFile plain_buf( + "./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + plain_buf.truncate(plain_buf.size() - 30); + + DB::KeeperLogStore changelog_reader( + DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::FlushSettings(), + keeper_context); + changelog_reader.init(1, 0); + + EXPECT_EQ(changelog_reader.size(), 19); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); + assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension); + EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin" + params.extension)); + auto entry = getLogEntry("hello_world", 7777); + changelog_reader.append(entry); + changelog_reader.end_of_append_batch(0, 0); + + waitDurableLogs(changelog_reader); + + EXPECT_EQ(changelog_reader.size(), 20); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777); +} + TEST_P(CoordinationTest, ChangelogTestLostFiles) { auto params = GetParam(); diff --git a/tests/integration/test_keeper_broken_logs/__init__.py b/tests/integration/test_keeper_broken_logs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_keeper_broken_logs/configs/enable_keeper1.xml b/tests/integration/test_keeper_broken_logs/configs/enable_keeper1.xml new file mode 100644 index 00000000000..870326838e6 --- /dev/null +++ b/tests/integration/test_keeper_broken_logs/configs/enable_keeper1.xml @@ -0,0 +1,44 @@ + + + false + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + false + + + 5000 + 10000 + 75 + trace + false + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_broken_logs/configs/enable_keeper2.xml b/tests/integration/test_keeper_broken_logs/configs/enable_keeper2.xml new file mode 100644 index 00000000000..ee2ff903dff --- /dev/null +++ b/tests/integration/test_keeper_broken_logs/configs/enable_keeper2.xml @@ -0,0 +1,43 @@ + + + false + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + false + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_broken_logs/configs/enable_keeper3.xml b/tests/integration/test_keeper_broken_logs/configs/enable_keeper3.xml new file mode 100644 index 00000000000..a16fea43125 --- /dev/null +++ b/tests/integration/test_keeper_broken_logs/configs/enable_keeper3.xml @@ -0,0 +1,43 @@ + + + false + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + false + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_broken_logs/test.py b/tests/integration/test_keeper_broken_logs/test.py new file mode 100644 index 00000000000..e283d946174 --- /dev/null +++ b/tests/integration/test_keeper_broken_logs/test.py @@ -0,0 +1,128 @@ +import pytest +from helpers.cluster import ClickHouseCluster +import helpers.keeper_utils as keeper_utils +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance( + "node1", + main_configs=["configs/enable_keeper1.xml"], + stay_alive=True, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/enable_keeper2.xml"], + stay_alive=True, +) +node3 = cluster.add_instance( + "node3", + main_configs=["configs/enable_keeper3.xml"], + stay_alive=True, +) + +from kazoo.client import KazooClient, KazooState + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def smaller_exception(ex): + return "\n".join(str(ex).split("\n")[0:2]) + + +def wait_nodes(): + keeper_utils.wait_nodes(cluster, [node1, node2, node3]) + + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient( + hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout + ) + _fake_zk_instance.start() + return _fake_zk_instance + + +def test_single_node_broken_log(started_cluster): + try: + wait_nodes() + node1_conn = get_fake_zk("node1") + + # Cleanup + if node1_conn.exists("/test_broken_log") != None: + node1_conn.delete("/test_broken_log") + + node1_conn.create("/test_broken_log") + for _ in range(10): + node1_conn.create(f"/test_broken_log/node", b"somedata1", sequence=True) + + def verify_nodes(zk_conn): + children = zk_conn.get_children("/test_broken_log") + assert len(children) == 10 + + for child in children: + assert zk_conn.get("/test_broken_log/" + child)[0] == b"somedata1" + + verify_nodes(node1_conn) + + node1_conn.stop() + node1_conn.close() + + node1.stop_clickhouse() + node1.exec_in_container( + [ + "truncate", + "-s", + "-50", + "/var/lib/clickhouse/coordination/log/changelog_1_100000.bin", + ] + ) + node1.start_clickhouse() + keeper_utils.wait_until_connected(cluster, node1) + + node1_conn = get_fake_zk("node1") + node1_conn.create(f"/test_broken_log_final_node", b"somedata1") + + verify_nodes(node1_conn) + assert node1_conn.get("/test_broken_log_final_node")[0] == b"somedata1" + + node2_conn = get_fake_zk("node2") + verify_nodes(node2_conn) + assert node2_conn.get("/test_broken_log_final_node")[0] == b"somedata1" + + node3_conn = get_fake_zk("node2") + verify_nodes(node3_conn) + assert node3_conn.get("/test_broken_log_final_node")[0] == b"somedata1" + + assert ( + node1.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"]) + == "changelog_1_100000.bin\nchangelog_14_100013.bin\n" + ) + assert ( + node2.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"]) + == "changelog_1_100000.bin\n" + ) + assert ( + node3.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"]) + == "changelog_1_100000.bin\n" + ) + finally: + try: + for zk_conn in [node1_conn, node2_conn, node3_conn]: + zk_conn.stop() + zk_conn.close() + except: + pass