Avoid taking lock

This commit is contained in:
Antonio Andelic 2024-06-04 14:38:47 +02:00
parent 9d30a7f056
commit 5c9aac484d
9 changed files with 85 additions and 57 deletions

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
DB::KeeperSnapshotManager manager(1, keeper_context);
auto snp = manager.serializeSnapshotToBuffer(snapshot);
auto file_info = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
std::cout << "Snapshot serialized to path:" << fs::path(file_info.disk->getPath()) / file_info.path << std::endl;
std::cout << "Snapshot serialized to path:" << fs::path(file_info->disk->getPath()) / file_info->path << std::endl;
}
catch (...)
{

View File

@ -13,8 +13,9 @@ struct CopyableAtomic
: value(other.value.load())
{}
explicit CopyableAtomic(T && value_)
: value(std::forward<T>(value_))
template <std::convertible_to<T> U>
explicit CopyableAtomic(U && value_)
: value(std::forward<U>(value_))
{}
CopyableAtomic & operator=(const CopyableAtomic & other)
@ -23,9 +24,10 @@ struct CopyableAtomic
return *this;
}
CopyableAtomic & operator=(bool value_)
template <std::convertible_to<T> U>
CopyableAtomic & operator=(U && value_)
{
value = value_;
value = std::forward<U>(value_);
return *this;
}

View File

@ -332,9 +332,10 @@ void KeeperDispatcher::snapshotThread()
if (shutdown_called)
break;
if (snapshot_file_info.path.empty())
if (!snapshot_file_info)
continue;
chassert(snapshot_file_info->disk != nullptr);
if (isLeader())
snapshot_s3.uploadSnapshot(snapshot_file_info);
}

View File

@ -618,7 +618,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(
LOG_TRACE(log, "Found {} on {}", snapshot_file, disk->getName());
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(snapshot_file);
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{snapshot_file, disk});
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, std::make_shared<SnapshotFileInfo>(snapshot_file, disk));
if (!inserted)
LOG_WARNING(
@ -651,7 +651,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(
moveSnapshotsIfNeeded();
}
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
SnapshotFileInfoPtr KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
{
ReadBufferFromNuraftBuffer reader(buffer);
@ -672,11 +672,12 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
auto snapshot_file_info = std::make_shared<SnapshotFileInfo>(snapshot_file_name, disk);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_info);
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
return {snapshot_file_name, disk};
return snapshot_file_info;
}
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
@ -690,7 +691,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
}
catch (const DB::Exception &)
{
const auto & [path, disk, size] = latest_itr->second;
const auto & [path, disk, size] = *latest_itr->second;
disk->removeFile(path);
existing_snapshots.erase(latest_itr->first);
tryLogCurrentException(__PRETTY_FUNCTION__);
@ -702,7 +703,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
{
const auto & [snapshot_path, snapshot_disk, size] = existing_snapshots.at(up_to_log_idx);
const auto & [snapshot_path, snapshot_disk, size] = *existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer;
auto reader = snapshot_disk->readFile(snapshot_path);
copyData(*reader, writer);
@ -794,18 +795,18 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
{
if (idx == latest_snapshot_idx)
{
if (file_info.disk != latest_snapshot_disk)
if (file_info->disk != latest_snapshot_disk)
{
moveSnapshotBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path, keeper_context);
file_info.disk = latest_snapshot_disk;
moveSnapshotBetweenDisks(file_info->disk, file_info->path, latest_snapshot_disk, file_info->path, keeper_context);
file_info->disk = latest_snapshot_disk;
}
}
else
{
if (file_info.disk != disk)
if (file_info->disk != disk)
{
moveSnapshotBetweenDisks(file_info.disk, file_info.path, disk, file_info.path, keeper_context);
file_info.disk = disk;
moveSnapshotBetweenDisks(file_info->disk, file_info->path, disk, file_info->path, keeper_context);
file_info->disk = disk;
}
}
}
@ -817,12 +818,12 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
const auto & [path, disk, size] = itr->second;
const auto & [path, disk, size] = *itr->second;
disk->removeFileIfExists(path);
existing_snapshots.erase(itr);
}
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
SnapshotFileInfoPtr KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
{
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
@ -847,7 +848,8 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
auto snapshot_file_info = std::make_shared<SnapshotFileInfo>(snapshot_file_name, disk);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_info);
try
{
@ -859,7 +861,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
tryLogCurrentException(log, "Failed to cleanup and/or move older snapshots");
}
return {snapshot_file_name, disk};
return snapshot_file_info;
}
size_t KeeperSnapshotManager::getLatestSnapshotIndex() const
@ -869,23 +871,23 @@ size_t KeeperSnapshotManager::getLatestSnapshotIndex() const
return 0;
}
SnapshotFileInfo KeeperSnapshotManager::getLatestSnapshotInfo() const
SnapshotFileInfoPtr KeeperSnapshotManager::getLatestSnapshotInfo() const
{
if (!existing_snapshots.empty())
{
const auto & [path, disk, size] = existing_snapshots.at(getLatestSnapshotIndex());
const auto & [path, disk, size] = *existing_snapshots.at(getLatestSnapshotIndex());
try
{
if (disk->exists(path))
return {path, disk};
return std::make_shared<SnapshotFileInfo>(path, disk);
}
catch (...)
{
tryLogCurrentException(log);
}
}
return {"", nullptr};
return nullptr;
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Coordination/KeeperStorage.h>
#include <Common/CopyableAtomic.h>
#include <libnuraft/nuraft.hxx>
namespace DB
@ -93,13 +94,20 @@ public:
struct SnapshotFileInfo
{
SnapshotFileInfo(std::string path_, DiskPtr disk_)
: path(std::move(path_))
, disk(std::move(disk_))
{}
std::string path;
DiskPtr disk;
mutable std::optional<size_t> size = std::nullopt;
mutable std::atomic<size_t> size{0};
};
using SnapshotFileInfoPtr = std::shared_ptr<SnapshotFileInfo>;
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&, bool)>;
using CreateSnapshotCallback = std::function<std::shared_ptr<SnapshotFileInfo>(KeeperStorageSnapshotPtr &&, bool)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
@ -122,10 +130,10 @@ public:
nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const;
/// Serialize already compressed snapshot to disk (return path)
SnapshotFileInfo serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
SnapshotFileInfoPtr serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
/// Serialize snapshot directly to disk
SnapshotFileInfo serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot);
SnapshotFileInfoPtr serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot);
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
@ -144,7 +152,7 @@ public:
/// The most fresh snapshot log index we have
size_t getLatestSnapshotIndex() const;
SnapshotFileInfo getLatestSnapshotInfo() const;
SnapshotFileInfoPtr getLatestSnapshotInfo() const;
private:
void removeOutdatedSnapshotsIfNeeded();
@ -160,7 +168,7 @@ private:
/// How many snapshots to keep before remove
const size_t snapshots_to_keep;
/// All existing snapshots in our path (log_index -> path)
std::map<uint64_t, SnapshotFileInfo> existing_snapshots;
std::map<uint64_t, SnapshotFileInfoPtr> existing_snapshots;
/// Compress snapshots in common ZSTD format instead of custom ClickHouse block LZ4 format
const bool compress_snapshots_zstd;
/// Superdigest for deserialization of storage

View File

@ -145,9 +145,9 @@ std::shared_ptr<KeeperSnapshotManagerS3::S3Configuration> KeeperSnapshotManagerS
return snapshot_s3_client;
}
void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapshot_file_info)
void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfoPtr & snapshot_file_info)
{
const auto & [snapshot_path, snapshot_disk, snapshot_size] = snapshot_file_info;
const auto & [snapshot_path, snapshot_disk, snapshot_size] = *snapshot_file_info;
try
{
auto s3_client = getSnapshotS3Client();
@ -169,9 +169,9 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
);
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
auto snapshot_file = snapshot_disk->readFile(snapshot_file_info.path);
auto snapshot_file = snapshot_disk->readFile(snapshot_path);
auto snapshot_name = fs::path(snapshot_path).filename().string();
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
@ -261,7 +261,7 @@ void KeeperSnapshotManagerS3::snapshotS3Thread()
while (!shutdown_called)
{
SnapshotFileInfo snapshot_file_info;
SnapshotFileInfoPtr snapshot_file_info;
if (!snapshots_s3_queue.pop(snapshot_file_info))
break;
@ -272,7 +272,7 @@ void KeeperSnapshotManagerS3::snapshotS3Thread()
}
}
void KeeperSnapshotManagerS3::uploadSnapshot(const SnapshotFileInfo & file_info, bool async_upload)
void KeeperSnapshotManagerS3::uploadSnapshot(const SnapshotFileInfoPtr & file_info, bool async_upload)
{
if (getSnapshotS3Client() == nullptr)
return;
@ -280,7 +280,7 @@ void KeeperSnapshotManagerS3::uploadSnapshot(const SnapshotFileInfo & file_info,
if (async_upload)
{
if (!snapshots_s3_queue.push(file_info))
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", file_info.path);
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", file_info->path);
return;
}

View File

@ -12,8 +12,6 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
#include <string>
#endif
namespace DB
@ -27,13 +25,13 @@ public:
/// 'macros' are used to substitute macros in endpoint of disks
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros);
void uploadSnapshot(const SnapshotFileInfo & file_info, bool async_upload = true);
void uploadSnapshot(const SnapshotFileInfoPtr & file_info, bool async_upload = true);
/// 'macros' are used to substitute macros in endpoint of disks
void startup(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros);
void shutdown();
private:
using SnapshotS3Queue = ConcurrentBoundedQueue<SnapshotFileInfo>;
using SnapshotS3Queue = ConcurrentBoundedQueue<SnapshotFileInfoPtr>;
SnapshotS3Queue snapshots_s3_queue;
/// Upload new snapshots to S3
@ -51,7 +49,7 @@ private:
std::shared_ptr<S3Configuration> getSnapshotS3Client() const;
void uploadSnapshotImpl(const SnapshotFileInfo & snapshot_file_info);
void uploadSnapshotImpl(const SnapshotFileInfoPtr & snapshot_file_info);
/// Thread upload snapshots to S3 in the background
void snapshotS3Thread();

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <cerrno>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h>
@ -90,8 +91,9 @@ void KeeperStateMachine::init()
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
latest_snapshot_info = snapshot_manager.getLatestSnapshotInfo();
chassert(latest_snapshot_info);
if (isLocalDisk(*latest_snapshot_info.disk))
if (isLocalDisk(*latest_snapshot_info->disk))
latest_snapshot_buf = nullptr;
storage = std::move(snapshot_deserialization_result.storage);
@ -603,7 +605,11 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path);
LOG_DEBUG(
log,
"Created persistent snapshot {} with path {}",
latest_snapshot_meta->get_last_log_idx(),
latest_snapshot_info->path);
}
}
}
@ -627,7 +633,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
when_done(ret, exception);
return ret ? latest_snapshot_info : SnapshotFileInfo{};
return ret ? latest_snapshot_info : nullptr;
};
if (keeper_context->getServerState() == KeeperContext::Phase::SHUTDOWN)
@ -635,9 +641,9 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled.");
auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false);
if (!snapshot_file_info.path.empty() && snapshot_manager_s3)
if (snapshot_file_info && snapshot_manager_s3)
{
LOG_INFO(log, "Uploading snapshot {} during shutdown because 'upload_snapshot_on_exit' is enabled.", snapshot_file_info.path);
LOG_INFO(log, "Uploading snapshot {} during shutdown because 'upload_snapshot_on_exit' is enabled.", snapshot_file_info->path);
snapshot_manager_s3->uploadSnapshot(snapshot_file_info, /* asnyc_upload */ false);
}
@ -672,7 +678,7 @@ void KeeperStateMachine::save_logical_snp_obj(
latest_snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(data, s.get_last_log_idx());
latest_snapshot_meta = cloned_meta;
latest_snapshot_buf = std::move(cloned_buffer);
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), latest_snapshot_info.path);
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), latest_snapshot_info->path);
obj_id++;
ProfileEvents::increment(ProfileEvents::KeeperSaveSnapshot);
}
@ -733,7 +739,7 @@ int KeeperStateMachine::read_logical_snp_obj(
return -1;
}
const auto & [path, disk, size] = latest_snapshot_info;
const auto & [path, disk, size] = *latest_snapshot_info;
if (isLocalDisk(*disk))
{
auto full_path = fs::path(disk->getPath()) / path;
@ -864,14 +870,25 @@ uint64_t KeeperStateMachine::getKeyArenaSize() const
uint64_t KeeperStateMachine::getLatestSnapshotSize() const
{
std::lock_guard lock(snapshots_lock);
if (latest_snapshot_info.disk == nullptr)
auto snapshot_info = [&]
{
std::lock_guard lock(snapshots_lock);
return latest_snapshot_info;
}();
if (snapshot_info == nullptr || snapshot_info->disk == nullptr)
return 0;
if (!latest_snapshot_info.size.has_value())
latest_snapshot_info.size = latest_snapshot_info.disk->getFileSize(latest_snapshot_info.path);
/// there is a possibility multiple threads can try to get size
/// this can happen in rare cases while it's not a heavy operation
size_t size = snapshot_info->size.load(std::memory_order_relaxed);
if (size == 0)
{
size = snapshot_info->disk->getFileSize(snapshot_info->path);
snapshot_info->size.store(size, std::memory_order_relaxed);
}
return *latest_snapshot_info.size;
return size;
}
ClusterConfigPtr KeeperStateMachine::getClusterConfig() const

View File

@ -135,7 +135,7 @@ private:
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
SnapshotMetadataPtr latest_snapshot_meta = nullptr;
SnapshotFileInfo latest_snapshot_info;
std::shared_ptr<SnapshotFileInfo> latest_snapshot_info;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
/// Main state machine logic