mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #57047 from ClickHouse/keeper-small-fixes
Small Keeper fixes
This commit is contained in:
commit
b18cb1e5cd
@ -617,8 +617,13 @@ Changelog::Changelog(
|
|||||||
|
|
||||||
/// Load all files on changelog disks
|
/// Load all files on changelog disks
|
||||||
|
|
||||||
|
std::unordered_set<DiskPtr> read_disks;
|
||||||
|
|
||||||
const auto load_from_disk = [&](const auto & disk)
|
const auto load_from_disk = [&](const auto & disk)
|
||||||
{
|
{
|
||||||
|
if (read_disks.contains(disk))
|
||||||
|
return;
|
||||||
|
|
||||||
LOG_TRACE(log, "Reading from disk {}", disk->getName());
|
LOG_TRACE(log, "Reading from disk {}", disk->getName());
|
||||||
std::unordered_map<std::string, std::string> incomplete_files;
|
std::unordered_map<std::string, std::string> incomplete_files;
|
||||||
|
|
||||||
@ -639,19 +644,25 @@ Changelog::Changelog(
|
|||||||
std::vector<std::string> changelog_files;
|
std::vector<std::string> changelog_files;
|
||||||
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
|
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
|
||||||
{
|
{
|
||||||
if (it->name() == changelogs_detached_dir)
|
const auto & file_name = it->name();
|
||||||
|
if (file_name == changelogs_detached_dir)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (it->name().starts_with(tmp_prefix))
|
if (file_name.starts_with(tmp_prefix))
|
||||||
{
|
{
|
||||||
incomplete_files.emplace(it->name().substr(tmp_prefix.size()), it->path());
|
incomplete_files.emplace(file_name.substr(tmp_prefix.size()), it->path());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (clean_incomplete_file(it->path()))
|
if (file_name.starts_with(DEFAULT_PREFIX))
|
||||||
continue;
|
{
|
||||||
|
if (!clean_incomplete_file(it->path()))
|
||||||
changelog_files.push_back(it->path());
|
changelog_files.push_back(it->path());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG_WARNING(log, "Unknown file found in log directory: {}", file_name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto & changelog_file : changelog_files)
|
for (const auto & changelog_file : changelog_files)
|
||||||
@ -671,6 +682,8 @@ Changelog::Changelog(
|
|||||||
|
|
||||||
for (const auto & [name, path] : incomplete_files)
|
for (const auto & [name, path] : incomplete_files)
|
||||||
disk->removeFile(path);
|
disk->removeFile(path);
|
||||||
|
|
||||||
|
read_disks.insert(disk);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Load all files from old disks
|
/// Load all files from old disks
|
||||||
|
@ -539,8 +539,12 @@ KeeperSnapshotManager::KeeperSnapshotManager(
|
|||||||
, storage_tick_time(storage_tick_time_)
|
, storage_tick_time(storage_tick_time_)
|
||||||
, keeper_context(keeper_context_)
|
, keeper_context(keeper_context_)
|
||||||
{
|
{
|
||||||
|
std::unordered_set<DiskPtr> read_disks;
|
||||||
const auto load_snapshot_from_disk = [&](const auto & disk)
|
const auto load_snapshot_from_disk = [&](const auto & disk)
|
||||||
{
|
{
|
||||||
|
if (read_disks.contains(disk))
|
||||||
|
return;
|
||||||
|
|
||||||
LOG_TRACE(log, "Reading from disk {}", disk->getName());
|
LOG_TRACE(log, "Reading from disk {}", disk->getName());
|
||||||
std::unordered_map<std::string, std::string> incomplete_files;
|
std::unordered_map<std::string, std::string> incomplete_files;
|
||||||
|
|
||||||
@ -590,6 +594,8 @@ KeeperSnapshotManager::KeeperSnapshotManager(
|
|||||||
|
|
||||||
for (const auto & [name, path] : incomplete_files)
|
for (const auto & [name, path] : incomplete_files)
|
||||||
disk->removeFile(path);
|
disk->removeFile(path);
|
||||||
|
|
||||||
|
read_disks.insert(disk);
|
||||||
};
|
};
|
||||||
|
|
||||||
for (const auto & disk : keeper_context->getOldSnapshotDisks())
|
for (const auto & disk : keeper_context->getOldSnapshotDisks())
|
||||||
|
@ -269,6 +269,7 @@ KeeperStateManager::KeeperStateManager(
|
|||||||
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
|
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
|
||||||
{
|
{
|
||||||
log_store->init(last_commited_index, logs_to_keep);
|
log_store->init(last_commited_index, logs_to_keep);
|
||||||
|
log_store_initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void KeeperStateManager::system_exit(const int /* exit_code */)
|
void KeeperStateManager::system_exit(const int /* exit_code */)
|
||||||
@ -361,6 +362,8 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state)
|
|||||||
|
|
||||||
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
|
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
|
||||||
{
|
{
|
||||||
|
chassert(log_store_initialized);
|
||||||
|
|
||||||
const auto & old_path = getOldServerStatePath();
|
const auto & old_path = getOldServerStatePath();
|
||||||
|
|
||||||
auto disk = getStateFileDisk();
|
auto disk = getStateFileDisk();
|
||||||
@ -454,7 +457,12 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
|
|||||||
disk->removeFile(copy_lock_file);
|
disk->removeFile(copy_lock_file);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_WARNING(logger, "No state was read");
|
if (log_store->next_slot() != 1)
|
||||||
|
LOG_ERROR(
|
||||||
|
logger,
|
||||||
|
"No state was read but Keeper contains data which indicates that the state file was lost. This is dangerous and can lead to "
|
||||||
|
"data loss.");
|
||||||
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +121,7 @@ private:
|
|||||||
mutable std::mutex configuration_wrapper_mutex;
|
mutable std::mutex configuration_wrapper_mutex;
|
||||||
KeeperConfigurationWrapper configuration_wrapper TSA_GUARDED_BY(configuration_wrapper_mutex);
|
KeeperConfigurationWrapper configuration_wrapper TSA_GUARDED_BY(configuration_wrapper_mutex);
|
||||||
|
|
||||||
|
bool log_store_initialized = false;
|
||||||
nuraft::ptr<KeeperLogStore> log_store;
|
nuraft::ptr<KeeperLogStore> log_store;
|
||||||
|
|
||||||
const String server_state_file_name;
|
const String server_state_file_name;
|
||||||
|
Loading…
Reference in New Issue
Block a user