Merge pull request #56670 from ClickHouse/more-reliable-log-handling-keeper

More reliable log handling in Keeper
This commit is contained in:
Antonio Andelic 2023-11-14 14:39:29 +01:00 committed by GitHub
commit 8a63e924a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 387 additions and 48 deletions

View File

@ -707,6 +707,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_to_read_from = 1;
uint64_t last_read_index = 0;
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description_ptr] : existing_changelogs)
{
@ -747,27 +749,29 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
changelog_description.from_log_index);
}
}
else if ((changelog_description.from_log_index - last_log_read_result->last_read_index) > 1)
else if ((changelog_description.from_log_index - last_read_index) > 1)
{
LOG_ERROR(
log,
"Some records were lost, last found log index {}, while the next log index on disk is {}. Hopefully will receive "
"missing records from leader.",
last_log_read_result->last_read_index,
changelog_description.from_log_index);
removeAllLogsAfter(last_log_read_result->log_start_index);
if (!last_log_read_result->error)
{
LOG_ERROR(
log,
"Some records were lost, last found log index {}, while the next log index on disk is {}. Hopefully will receive "
"missing records from leader.",
last_read_index,
changelog_description.from_log_index);
removeAllLogsAfter(last_log_read_result->log_start_index);
}
break;
}
ChangelogReader reader(changelog_description.disk, changelog_description.path);
last_log_read_result = reader.readChangelog(logs, start_to_read_from, log);
if (last_log_read_result->last_read_index != 0)
last_read_index = last_log_read_result->last_read_index;
last_log_read_result->log_start_index = changelog_description.from_log_index;
if (last_log_read_result->error)
{
last_log_is_not_complete = true;
break;
}
/// Otherwise we have already initialized it
if (min_log_id == 0)
min_log_id = last_log_read_result->first_read_index;
@ -779,14 +783,20 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
uint64_t log_count = changelog_description.expectedEntriesCountInLog();
/// Unfinished log
if (last_log_read_result->error || last_log_read_result->total_entries_read_from_log < log_count)
{
last_log_is_not_complete = true;
break;
}
last_log_is_not_complete = last_log_read_result->error || last_log_read_result->total_entries_read_from_log < log_count;
}
}
const auto move_from_latest_logs_disks = [&](auto & description)
{
/// check if we need to move completed log to another disk
auto latest_log_disk = getLatestLogDisk();
auto disk = getDisk();
if (latest_log_disk != disk && latest_log_disk == description->disk)
moveFileBetweenDisks(latest_log_disk, description, disk, description->path);
};
/// we can have empty log (with zero entries) and last_log_read_result will be initialized
if (!last_log_read_result || min_log_id == 0) /// We just may have no logs (only snapshot or nothing)
{
@ -813,23 +823,34 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
assert(last_log_read_result != std::nullopt);
assert(!existing_changelogs.empty());
/// Actually they shouldn't exist, but to be sure we remove them
removeAllLogsAfter(last_log_read_result->log_start_index);
/// This log, even if it finished with error shouldn't be removed
assert(existing_changelogs.find(last_log_read_result->log_start_index) != existing_changelogs.end());
assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
/// Continue to write into incomplete existing log if it didn't finish with error
const auto & description = existing_changelogs[last_log_read_result->log_start_index];
if (last_log_read_result->last_read_index == 0 || last_log_read_result->error) /// If it's broken log then remove it
const auto remove_invalid_logs = [&]
{
LOG_INFO(log, "Removing chagelog {} because it's empty or read finished with error", description->path);
/// Actually they shouldn't exist, but to be sure we remove them
removeAllLogsAfter(last_log_read_result->log_start_index);
/// This log, even if it finished with error shouldn't be removed
chassert(existing_changelogs.find(last_log_read_result->log_start_index) != existing_changelogs.end());
chassert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
};
if (last_log_read_result->last_read_index == 0) /// If it's broken or empty log then remove it
{
LOG_INFO(log, "Removing chagelog {} because it's empty", description->path);
remove_invalid_logs();
description->disk->removeFile(description->path);
existing_changelogs.erase(last_log_read_result->log_start_index);
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; });
}
else if (last_log_read_result->error)
{
LOG_INFO(log, "Chagelog {} read finished with error but some logs were read from it, file will not be removed", description->path);
remove_invalid_logs();
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first > last_log_read_result->last_read_index; });
move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index));
}
else
{
initWriter(description);
@ -837,13 +858,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
}
else if (last_log_read_result.has_value())
{
/// check if we need to move completed log to another disk
auto latest_log_disk = getLatestLogDisk();
auto disk = getDisk();
auto & description = existing_changelogs.at(last_log_read_result->log_start_index);
if (latest_log_disk != disk && latest_log_disk == description->disk)
moveFileBetweenDisks(latest_log_disk, description, disk, description->path);
move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index));
}
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
@ -927,17 +942,19 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
for (auto itr = begin; itr != end;)
{
auto & changelog_description = itr->second;
if (!disk->exists(timestamp_folder))
{
LOG_WARNING(log, "Moving broken logs to {}", timestamp_folder);
disk->createDirectories(timestamp_folder);
}
LOG_WARNING(log, "Removing changelog {}", itr->second->path);
const std::filesystem::path & path = itr->second->path;
LOG_WARNING(log, "Removing changelog {}", changelog_description->path);
const std::filesystem::path & path = changelog_description->path;
const auto new_path = timestamp_folder / path.filename();
auto changelog_disk = itr->second->disk;
auto changelog_disk = changelog_description->disk;
if (changelog_disk == disk)
{
try
@ -947,11 +964,11 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED)
moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path);
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path);
}
}
else
moveFileBetweenDisks(changelog_disk, itr->second, disk, new_path);
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path);
itr = existing_changelogs.erase(itr);
}

View File

@ -167,9 +167,9 @@ private:
std::map<uint64_t, ChangelogFileDescriptionPtr> existing_changelogs;
using ChangelogIter = decltype(existing_changelogs)::iterator;
void removeExistingLogs(ChangelogIter begin, ChangelogIter end);
static void removeLog(const std::filesystem::path & path, const std::filesystem::path & detached_folder);
/// Remove all changelogs from disk with start_index bigger than start_to_remove_from_id
void removeAllLogsAfter(uint64_t remove_after_log_start_index);
/// Remove all logs from disk

View File

@ -367,9 +367,9 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
{
LOG_DEBUG(log, "Initializing storage dispatcher");
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
keeper_context->initialize(config, this);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_request_queue_size);
@ -452,7 +452,7 @@ void KeeperDispatcher::shutdown()
try
{
{
if (keeper_context->shutdown_called.exchange(true))
if (!keeper_context || keeper_context->shutdown_called.exchange(true))
return;
LOG_DEBUG(log, "Shutting down storage dispatcher");

View File

@ -4,7 +4,6 @@
#include "config.h"
#include <chrono>
#include <filesystem>
#include <string>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
@ -617,6 +616,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
{
const auto preprocess_logs = [&]
{
keeper_context->local_logs_preprocessed = true;
auto log_store = state_manager->load_log_store();
if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index())
{
@ -642,7 +642,6 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
{
LOG_INFO(log, "All local log entries preprocessed");
}
keeper_context->local_logs_preprocessed = true;
};
switch (type)

View File

@ -1,5 +1,4 @@
#include <cerrno>
#include <future>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperDispatcher.h>
@ -162,6 +161,15 @@ void assertDigest(
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
{
auto result = nuraft::buffer::alloc(sizeof(log_idx));
nuraft::buffer_serializer ss(result);
ss.put_u64(log_idx);
/// Don't preprocess anything until the first commit when we will manually pre_commit and commit
/// all needed logs
if (!keeper_context->local_logs_preprocessed)
return result;
auto request_for_session = parseRequest(data, /*final=*/false);
if (!request_for_session->zxid)
request_for_session->zxid = log_idx;
@ -169,9 +177,6 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
request_for_session->log_idx = log_idx;
preprocess(*request_for_session);
auto result = nuraft::buffer::alloc(sizeof(log_idx));
nuraft::buffer_serializer ss(result);
ss.put_u64(log_idx);
return result;
}
@ -506,6 +511,10 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraf
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
{
/// Don't rollback anything until the first commit because nothing was preprocessed
if (!keeper_context->local_logs_preprocessed)
return;
auto request_for_session = parseRequest(data, true);
// If we received a log from an older node, use the log_idx as the zxid
// log_idx will always be larger or equal to the zxid so we can safely do this

View File

@ -1048,6 +1048,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
/// Truncating all entries
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
{
auto params = GetParam();
@ -1102,6 +1103,61 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
/// Truncating only some entries from the end
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
{
auto params = GetParam();
/// For compressed logs we have no reliable way of knowing how many log entries were lost
/// after we truncate some bytes from the end
if (!params.extension.empty())
return;
ChangelogDirTest test("./logs");
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
waitDurableLogs(changelog);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(plain_buf.size() - 30);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 19);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin" + params.extension));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
waitDurableLogs(changelog_reader);
EXPECT_EQ(changelog_reader.size(), 20);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
}
TEST_P(CoordinationTest, ChangelogTestLostFiles)
{
auto params = GetParam();

View File

@ -0,0 +1,44 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<create_snapshot_on_exit>false</create_snapshot_on_exit>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
<compress_logs>false</compress_logs>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,43 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
<compress_logs>false</compress_logs>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,43 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
<compress_logs>false</compress_logs>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,128 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/enable_keeper1.xml"],
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/enable_keeper2.xml"],
stay_alive=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/enable_keeper3.xml"],
stay_alive=True,
)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def smaller_exception(ex):
return "\n".join(str(ex).split("\n")[0:2])
def wait_nodes():
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def test_single_node_broken_log(started_cluster):
try:
wait_nodes()
node1_conn = get_fake_zk("node1")
# Cleanup
if node1_conn.exists("/test_broken_log") != None:
node1_conn.delete("/test_broken_log")
node1_conn.create("/test_broken_log")
for _ in range(10):
node1_conn.create(f"/test_broken_log/node", b"somedata1", sequence=True)
def verify_nodes(zk_conn):
children = zk_conn.get_children("/test_broken_log")
assert len(children) == 10
for child in children:
assert zk_conn.get("/test_broken_log/" + child)[0] == b"somedata1"
verify_nodes(node1_conn)
node1_conn.stop()
node1_conn.close()
node1.stop_clickhouse()
node1.exec_in_container(
[
"truncate",
"-s",
"-50",
"/var/lib/clickhouse/coordination/log/changelog_1_100000.bin",
]
)
node1.start_clickhouse()
keeper_utils.wait_until_connected(cluster, node1)
node1_conn = get_fake_zk("node1")
node1_conn.create(f"/test_broken_log_final_node", b"somedata1")
verify_nodes(node1_conn)
assert node1_conn.get("/test_broken_log_final_node")[0] == b"somedata1"
node2_conn = get_fake_zk("node2")
verify_nodes(node2_conn)
assert node2_conn.get("/test_broken_log_final_node")[0] == b"somedata1"
node3_conn = get_fake_zk("node2")
verify_nodes(node3_conn)
assert node3_conn.get("/test_broken_log_final_node")[0] == b"somedata1"
assert (
node1.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"])
== "changelog_1_100000.bin\nchangelog_14_100013.bin\n"
)
assert (
node2.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"])
== "changelog_1_100000.bin\n"
)
assert (
node3.exec_in_container(["ls", "/var/lib/clickhouse/coordination/log"])
== "changelog_1_100000.bin\n"
)
finally:
try:
for zk_conn in [node1_conn, node2_conn, node3_conn]:
zk_conn.stop()
zk_conn.close()
except:
pass