Fix some bugs and add snapshots validation

This commit is contained in:
alesapin 2021-03-03 14:10:24 +03:00
parent 07e39ef47a
commit e615299ae6
19 changed files with 135 additions and 41 deletions

View File

@ -539,6 +539,7 @@
M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \
M(571, DATABASE_REPLICATION_FAILED) \
M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
M(573, UNKNOWN_SNAPSHOT) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -174,7 +174,6 @@ public:
readIntBinary(record.header.term, read_buf);
readIntBinary(record.header.value_type, read_buf);
readIntBinary(record.header.blob_size, read_buf);
std::cerr << "RECORD INDEX:" << record.header.index << std::endl;
if (record.header.version > CURRENT_CHANGELOG_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", record.header.version, filepath);
@ -207,10 +206,8 @@ public:
result.entries_read += 1;
std::cerr << "START:" << start_log_index << " RECORD: " << record.header.index << std::endl;
if (record.header.index < start_log_index)
{
std::cerr << "SKIPPING:" << record.header.index << std::endl;
continue;
}
@ -299,13 +296,10 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_index)
if (!started && start_index != 1)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Required to read data from {}, but we don't have any active changelogs", from_log_index);
std::cerr << "START INDEX:" << start_index << std::endl;
std::cerr << "LOGS SIZE:" << logs.size() << std::endl;
for (const auto & [key, value] : logs)
{
std::cerr << "KEY:" << key << std::endl;
}
std::cerr << "NEXT" << getNextEntryIndex() << std::endl;
/// Nothing was read. Our start index is smaller than required
if (logs.empty() && start_index != 1)
start_index--;
if (incomplete_log_index != 0)
{
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
@ -466,9 +460,6 @@ LogEntryPtr Changelog::getLastEntry() const
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(size_t)));
size_t next_index = getNextEntryIndex() - 1;
std::cerr << "NEXT INDEX:" << next_index << std::endl;
std::cerr << "START INDEX:" << start_index << std::endl;
std::cerr << "LOGS SIZE:" << logs.size() << std::endl;
auto entry = logs.find(next_index);
if (entry == logs.end())
return fake_entry;

View File

@ -22,14 +22,14 @@ struct Settings;
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(UInt64, reserved_log_items, 50000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, reserved_log_items, 10000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 10000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, max_stored_snapshots, 3, "How many snapshots we want to store", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, rotate_log_storage_interval, 10000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0)

View File

@ -41,19 +41,40 @@ NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, co
void NuKeeperStateMachine::init()
{
LOG_DEBUG(log, "Trying to load state machine");
latest_snapshot_buf = snapshot_manager.deserializeLatestSnapshotBufferFromDisk();
if (latest_snapshot_buf)
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
bool loaded = false;
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
while (snapshot_manager.totalSnapshots() != 0)
{
latest_snapshot_meta = snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
size_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
LOG_DEBUG(log, "Trying to load state machine from snapshot up to log index {}", latest_log_index);
try
{
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
latest_snapshot_meta = snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
loaded = true;
break;
}
catch (const DB::Exception & ex)
{
LOG_WARNING(log, "Failed to load from snapshot with index {}, with error {}, will remove it from disk", latest_log_index, ex.displayText());
snapshot_manager.removeSnapshot(latest_log_index);
}
}
if (has_snapshots)
{
if (loaded)
LOG_DEBUG(log, "Loaded snapshot with last commited log index {}", last_committed_idx);
else
LOG_WARNING(log, "All snapshots broken, last commited log index {}", last_committed_idx);
}
else
{
latest_snapshot_meta = nullptr;
last_committed_idx = 0;
LOG_DEBUG(log, "No existing snapshots, last commited log index {}", last_committed_idx);
}
LOG_DEBUG(log, "Loaded snapshot with last commited log index {}", last_committed_idx);
}
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)

View File

@ -62,8 +62,8 @@ public:
private:
SnapshotMetadataPtr latest_snapshot_meta;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf;
SnapshotMetadataPtr latest_snapshot_meta = nullptr;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
CoordinationSettingsPtr coordination_settings;

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int UNKNOWN_SNAPSHOT;
}
namespace
@ -203,6 +204,11 @@ NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_p
for (const auto & p : fs::directory_iterator(snapshots_path))
{
if (startsWith(p.path(), "tmp_")) /// Unfinished tmp files
{
std::filesystem::remove(p);
continue;
}
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path());
existing_snapshots[snapshot_up_to] = p.path();
}
@ -215,24 +221,40 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe
{
ReadBufferFromNuraftBuffer reader(buffer);
std::string new_snapshot_path = std::filesystem::path{snapshots_path} / getSnapshotFileName(up_to_log_idx);
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
std::string tmp_snapshot_path = std::filesystem::path{snapshots_path} / tmp_snapshot_file_name;
std::string new_snapshot_path = std::filesystem::path{snapshots_path} / snapshot_file_name;
WriteBufferFromFile plain_buf(new_snapshot_path);
WriteBufferFromFile plain_buf(tmp_snapshot_path);
copyData(reader, plain_buf);
plain_buf.sync();
std::filesystem::rename(tmp_snapshot_path, new_snapshot_path);
existing_snapshots.emplace(up_to_log_idx, new_snapshot_path);
removeOutdatedSnapshotsIfNeeded();
return new_snapshot_path;
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk() const
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
{
if (!existing_snapshots.empty())
while (!existing_snapshots.empty())
{
auto last_log_id = existing_snapshots.rbegin()->first;
return deserializeSnapshotBufferFromDisk(last_log_id);
auto latest_itr = existing_snapshots.rbegin();
try
{
return deserializeSnapshotBufferFromDisk(latest_itr->first);
}
catch (const DB::Exception &)
{
std::filesystem::remove(latest_itr->second);
existing_snapshots.erase(latest_itr->first);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
return nullptr;
}
@ -262,23 +284,31 @@ SnapshotMetadataPtr NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(NuKee
return NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
}
SnapshotMetadataPtr NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage) const
SnapshotMetadataPtr NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage)
{
if (existing_snapshots.empty())
return nullptr;
auto log_id = existing_snapshots.rbegin()->first;
auto buffer = deserializeSnapshotBufferFromDisk(log_id);
auto buffer = deserializeLatestSnapshotBufferFromDisk();
if (!buffer)
return nullptr;
return deserializeSnapshotFromBuffer(storage, buffer);
}
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
{
while (existing_snapshots.size() > snapshots_to_keep)
{
std::filesystem::remove(existing_snapshots.begin()->second);
existing_snapshots.erase(existing_snapshots.begin());
}
removeSnapshot(existing_snapshots.begin()->first);
}
void NuKeeperSnapshotManager::removeSnapshot(size_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
std::filesystem::remove(itr->second);
existing_snapshots.erase(itr);
}

View File

@ -43,7 +43,7 @@ class NuKeeperSnapshotManager
public:
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_);
SnapshotMetadataPtr restoreFromLatestSnapshot(NuKeeperStorage * storage) const;
SnapshotMetadataPtr restoreFromLatestSnapshot(NuKeeperStorage * storage);
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
@ -51,7 +51,21 @@ public:
static SnapshotMetadataPtr deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer);
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk() const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
void removeSnapshot(size_t log_idx);
size_t totalSnapshots() const
{
return existing_snapshots.size();
}
size_t getLatestSnapshotIndex() const
{
if (!existing_snapshots.empty())
return existing_snapshots.rbegin()->first;
return 0;
}
private:
void removeOutdatedSnapshotsIfNeeded();

View File

@ -1077,6 +1077,32 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
{
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
}
}
TEST(CoordinationTest, TestStorageSnapshotBroken)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
for (size_t i = 0; i < 50; ++i)
{
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
}
{
DB::NuKeeperStorageSnapshot snapshot(&storage, 50);
auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, 50);
}
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin"));
/// Let's corrupt file
DB::WriteBufferFromFile plain_buf("./snapshots/snapshot_50.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(34);
plain_buf.sync();
DB::NuKeeperStorage restored_storage(500);
EXPECT_THROW(manager.restoreFromLatestSnapshot(&restored_storage), DB::Exception);
}
int main(int argc, char ** argv)

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -3,6 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>