mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #64784 from ClickHouse/keeper-correct-snapshot-size
Fix Keeper snapshot size in `mntr`
This commit is contained in:
commit
0f23f9b384
@ -57,7 +57,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
|
||||
DB::KeeperSnapshotManager manager(1, keeper_context);
|
||||
auto snp = manager.serializeSnapshotToBuffer(snapshot);
|
||||
auto file_info = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
|
||||
std::cout << "Snapshot serialized to path:" << fs::path(file_info.disk->getPath()) / file_info.path << std::endl;
|
||||
std::cout << "Snapshot serialized to path:" << fs::path(file_info->disk->getPath()) / file_info->path << std::endl;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -13,8 +13,9 @@ struct CopyableAtomic
|
||||
: value(other.value.load())
|
||||
{}
|
||||
|
||||
explicit CopyableAtomic(T && value_)
|
||||
: value(std::forward<T>(value_))
|
||||
template <std::convertible_to<T> U>
|
||||
explicit CopyableAtomic(U && value_)
|
||||
: value(std::forward<U>(value_))
|
||||
{}
|
||||
|
||||
CopyableAtomic & operator=(const CopyableAtomic & other)
|
||||
@ -23,9 +24,10 @@ struct CopyableAtomic
|
||||
return *this;
|
||||
}
|
||||
|
||||
CopyableAtomic & operator=(bool value_)
|
||||
template <std::convertible_to<T> U>
|
||||
CopyableAtomic & operator=(U && value_)
|
||||
{
|
||||
value = value_;
|
||||
value = std::forward<U>(value_);
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -305,7 +305,7 @@ String MonitorCommand::run()
|
||||
print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount());
|
||||
print(ret, "approximate_data_size", state_machine.getApproximateDataSize());
|
||||
print(ret, "key_arena_size", state_machine.getKeyArenaSize());
|
||||
print(ret, "latest_snapshot_size", state_machine.getLatestSnapshotBufSize());
|
||||
print(ret, "latest_snapshot_size", state_machine.getLatestSnapshotSize());
|
||||
|
||||
#if defined(OS_LINUX) || defined(OS_DARWIN)
|
||||
print(ret, "open_file_descriptor_count", getCurrentProcessFDCount());
|
||||
|
@ -332,9 +332,10 @@ void KeeperDispatcher::snapshotThread()
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
if (snapshot_file_info.path.empty())
|
||||
if (!snapshot_file_info)
|
||||
continue;
|
||||
|
||||
chassert(snapshot_file_info->disk != nullptr);
|
||||
if (isLeader())
|
||||
snapshot_s3.uploadSnapshot(snapshot_file_info);
|
||||
}
|
||||
|
@ -618,7 +618,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(
|
||||
|
||||
LOG_TRACE(log, "Found {} on {}", snapshot_file, disk->getName());
|
||||
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(snapshot_file);
|
||||
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{snapshot_file, disk});
|
||||
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, std::make_shared<SnapshotFileInfo>(snapshot_file, disk));
|
||||
|
||||
if (!inserted)
|
||||
LOG_WARNING(
|
||||
@ -651,7 +651,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(
|
||||
moveSnapshotsIfNeeded();
|
||||
}
|
||||
|
||||
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
|
||||
SnapshotFileInfoPtr KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
|
||||
{
|
||||
ReadBufferFromNuraftBuffer reader(buffer);
|
||||
|
||||
@ -672,11 +672,12 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
|
||||
|
||||
disk->removeFile(tmp_snapshot_file_name);
|
||||
|
||||
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
|
||||
auto snapshot_file_info = std::make_shared<SnapshotFileInfo>(snapshot_file_name, disk);
|
||||
existing_snapshots.emplace(up_to_log_idx, snapshot_file_info);
|
||||
removeOutdatedSnapshotsIfNeeded();
|
||||
moveSnapshotsIfNeeded();
|
||||
|
||||
return {snapshot_file_name, disk};
|
||||
return snapshot_file_info;
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
|
||||
@ -690,7 +691,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
|
||||
}
|
||||
catch (const DB::Exception &)
|
||||
{
|
||||
const auto & [path, disk] = latest_itr->second;
|
||||
const auto & [path, disk, size] = *latest_itr->second;
|
||||
disk->removeFile(path);
|
||||
existing_snapshots.erase(latest_itr->first);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
@ -702,7 +703,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
|
||||
|
||||
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
|
||||
{
|
||||
const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx);
|
||||
const auto & [snapshot_path, snapshot_disk, size] = *existing_snapshots.at(up_to_log_idx);
|
||||
WriteBufferFromNuraftBuffer writer;
|
||||
auto reader = snapshot_disk->readFile(snapshot_path);
|
||||
copyData(*reader, writer);
|
||||
@ -794,18 +795,18 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
|
||||
{
|
||||
if (idx == latest_snapshot_idx)
|
||||
{
|
||||
if (file_info.disk != latest_snapshot_disk)
|
||||
if (file_info->disk != latest_snapshot_disk)
|
||||
{
|
||||
moveSnapshotBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path, keeper_context);
|
||||
file_info.disk = latest_snapshot_disk;
|
||||
moveSnapshotBetweenDisks(file_info->disk, file_info->path, latest_snapshot_disk, file_info->path, keeper_context);
|
||||
file_info->disk = latest_snapshot_disk;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (file_info.disk != disk)
|
||||
if (file_info->disk != disk)
|
||||
{
|
||||
moveSnapshotBetweenDisks(file_info.disk, file_info.path, disk, file_info.path, keeper_context);
|
||||
file_info.disk = disk;
|
||||
moveSnapshotBetweenDisks(file_info->disk, file_info->path, disk, file_info->path, keeper_context);
|
||||
file_info->disk = disk;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -817,12 +818,12 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
|
||||
auto itr = existing_snapshots.find(log_idx);
|
||||
if (itr == existing_snapshots.end())
|
||||
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
|
||||
const auto & [path, disk] = itr->second;
|
||||
const auto & [path, disk, size] = *itr->second;
|
||||
disk->removeFileIfExists(path);
|
||||
existing_snapshots.erase(itr);
|
||||
}
|
||||
|
||||
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
|
||||
SnapshotFileInfoPtr KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
|
||||
{
|
||||
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
|
||||
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
|
||||
@ -847,7 +848,8 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
|
||||
|
||||
disk->removeFile(tmp_snapshot_file_name);
|
||||
|
||||
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
|
||||
auto snapshot_file_info = std::make_shared<SnapshotFileInfo>(snapshot_file_name, disk);
|
||||
existing_snapshots.emplace(up_to_log_idx, snapshot_file_info);
|
||||
|
||||
try
|
||||
{
|
||||
@ -859,7 +861,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
|
||||
tryLogCurrentException(log, "Failed to cleanup and/or move older snapshots");
|
||||
}
|
||||
|
||||
return {snapshot_file_name, disk};
|
||||
return snapshot_file_info;
|
||||
}
|
||||
|
||||
size_t KeeperSnapshotManager::getLatestSnapshotIndex() const
|
||||
@ -869,23 +871,23 @@ size_t KeeperSnapshotManager::getLatestSnapshotIndex() const
|
||||
return 0;
|
||||
}
|
||||
|
||||
SnapshotFileInfo KeeperSnapshotManager::getLatestSnapshotInfo() const
|
||||
SnapshotFileInfoPtr KeeperSnapshotManager::getLatestSnapshotInfo() const
|
||||
{
|
||||
if (!existing_snapshots.empty())
|
||||
{
|
||||
const auto & [path, disk] = existing_snapshots.at(getLatestSnapshotIndex());
|
||||
const auto & [path, disk, size] = *existing_snapshots.at(getLatestSnapshotIndex());
|
||||
|
||||
try
|
||||
{
|
||||
if (disk->exists(path))
|
||||
return {path, disk};
|
||||
return std::make_shared<SnapshotFileInfo>(path, disk);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
}
|
||||
return {"", nullptr};
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <Coordination/KeeperStorage.h>
|
||||
#include <Common/CopyableAtomic.h>
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
|
||||
namespace DB
|
||||
@ -93,12 +94,20 @@ public:
|
||||
|
||||
struct SnapshotFileInfo
|
||||
{
|
||||
SnapshotFileInfo(std::string path_, DiskPtr disk_)
|
||||
: path(std::move(path_))
|
||||
, disk(std::move(disk_))
|
||||
{}
|
||||
|
||||
std::string path;
|
||||
DiskPtr disk;
|
||||
mutable std::atomic<size_t> size{0};
|
||||
};
|
||||
|
||||
using SnapshotFileInfoPtr = std::shared_ptr<SnapshotFileInfo>;
|
||||
|
||||
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
|
||||
using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&, bool)>;
|
||||
using CreateSnapshotCallback = std::function<std::shared_ptr<SnapshotFileInfo>(KeeperStorageSnapshotPtr &&, bool)>;
|
||||
|
||||
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
|
||||
|
||||
@ -121,10 +130,10 @@ public:
|
||||
nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const;
|
||||
|
||||
/// Serialize already compressed snapshot to disk (return path)
|
||||
SnapshotFileInfo serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
|
||||
SnapshotFileInfoPtr serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
|
||||
|
||||
/// Serialize snapshot directly to disk
|
||||
SnapshotFileInfo serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot);
|
||||
SnapshotFileInfoPtr serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot);
|
||||
|
||||
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
|
||||
|
||||
@ -143,7 +152,7 @@ public:
|
||||
/// The most fresh snapshot log index we have
|
||||
size_t getLatestSnapshotIndex() const;
|
||||
|
||||
SnapshotFileInfo getLatestSnapshotInfo() const;
|
||||
SnapshotFileInfoPtr getLatestSnapshotInfo() const;
|
||||
|
||||
private:
|
||||
void removeOutdatedSnapshotsIfNeeded();
|
||||
@ -159,7 +168,7 @@ private:
|
||||
/// How many snapshots to keep before remove
|
||||
const size_t snapshots_to_keep;
|
||||
/// All existing snapshots in our path (log_index -> path)
|
||||
std::map<uint64_t, SnapshotFileInfo> existing_snapshots;
|
||||
std::map<uint64_t, SnapshotFileInfoPtr> existing_snapshots;
|
||||
/// Compress snapshots in common ZSTD format instead of custom ClickHouse block LZ4 format
|
||||
const bool compress_snapshots_zstd;
|
||||
/// Superdigest for deserialization of storage
|
||||
|
@ -147,7 +147,7 @@ std::shared_ptr<KeeperSnapshotManagerS3::S3Configuration> KeeperSnapshotManagerS
|
||||
|
||||
void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapshot_file_info)
|
||||
{
|
||||
const auto & [snapshot_path, snapshot_disk] = snapshot_file_info;
|
||||
const auto & [snapshot_path, snapshot_disk, snapshot_size] = snapshot_file_info;
|
||||
try
|
||||
{
|
||||
auto s3_client = getSnapshotS3Client();
|
||||
@ -169,9 +169,9 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
);
|
||||
};
|
||||
|
||||
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);
|
||||
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
|
||||
|
||||
auto snapshot_file = snapshot_disk->readFile(snapshot_file_info.path);
|
||||
auto snapshot_file = snapshot_disk->readFile(snapshot_path);
|
||||
|
||||
auto snapshot_name = fs::path(snapshot_path).filename().string();
|
||||
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
|
||||
@ -261,31 +261,33 @@ void KeeperSnapshotManagerS3::snapshotS3Thread()
|
||||
|
||||
while (!shutdown_called)
|
||||
{
|
||||
SnapshotFileInfo snapshot_file_info;
|
||||
SnapshotFileInfoPtr snapshot_file_info;
|
||||
if (!snapshots_s3_queue.pop(snapshot_file_info))
|
||||
break;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
uploadSnapshotImpl(snapshot_file_info);
|
||||
uploadSnapshotImpl(*snapshot_file_info);
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::uploadSnapshot(const SnapshotFileInfo & file_info, bool async_upload)
|
||||
void KeeperSnapshotManagerS3::uploadSnapshot(const SnapshotFileInfoPtr & file_info, bool async_upload)
|
||||
{
|
||||
chassert(file_info);
|
||||
|
||||
if (getSnapshotS3Client() == nullptr)
|
||||
return;
|
||||
|
||||
if (async_upload)
|
||||
{
|
||||
if (!snapshots_s3_queue.push(file_info))
|
||||
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", file_info.path);
|
||||
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", file_info->path);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
uploadSnapshotImpl(file_info);
|
||||
uploadSnapshotImpl(*file_info);
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::startup(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros)
|
||||
|
@ -12,8 +12,6 @@
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
|
||||
#include <string>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
@ -27,13 +25,13 @@ public:
|
||||
|
||||
/// 'macros' are used to substitute macros in endpoint of disks
|
||||
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros);
|
||||
void uploadSnapshot(const SnapshotFileInfo & file_info, bool async_upload = true);
|
||||
void uploadSnapshot(const SnapshotFileInfoPtr & file_info, bool async_upload = true);
|
||||
|
||||
/// 'macros' are used to substitute macros in endpoint of disks
|
||||
void startup(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros);
|
||||
void shutdown();
|
||||
private:
|
||||
using SnapshotS3Queue = ConcurrentBoundedQueue<SnapshotFileInfo>;
|
||||
using SnapshotS3Queue = ConcurrentBoundedQueue<SnapshotFileInfoPtr>;
|
||||
SnapshotS3Queue snapshots_s3_queue;
|
||||
|
||||
/// Upload new snapshots to S3
|
||||
@ -63,7 +61,7 @@ public:
|
||||
KeeperSnapshotManagerS3() = default;
|
||||
|
||||
void updateS3Configuration(const Poco::Util::AbstractConfiguration &, const MultiVersion<Macros>::Version &) {}
|
||||
void uploadSnapshot(const SnapshotFileInfo &, [[maybe_unused]] bool async_upload = true) {}
|
||||
void uploadSnapshot(const SnapshotFileInfoPtr &, [[maybe_unused]] bool async_upload = true) {}
|
||||
|
||||
void startup(const Poco::Util::AbstractConfiguration &, const MultiVersion<Macros>::Version &) {}
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <atomic>
|
||||
#include <cerrno>
|
||||
#include <Coordination/KeeperSnapshotManager.h>
|
||||
#include <Coordination/KeeperStateMachine.h>
|
||||
@ -90,8 +91,9 @@ void KeeperStateMachine::init()
|
||||
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
|
||||
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
|
||||
latest_snapshot_info = snapshot_manager.getLatestSnapshotInfo();
|
||||
chassert(latest_snapshot_info);
|
||||
|
||||
if (isLocalDisk(*latest_snapshot_info.disk))
|
||||
if (isLocalDisk(*latest_snapshot_info->disk))
|
||||
latest_snapshot_buf = nullptr;
|
||||
|
||||
storage = std::move(snapshot_deserialization_result.storage);
|
||||
@ -603,7 +605,11 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
|
||||
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path);
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Created persistent snapshot {} with path {}",
|
||||
latest_snapshot_meta->get_last_log_idx(),
|
||||
latest_snapshot_info->path);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -627,7 +633,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
|
||||
when_done(ret, exception);
|
||||
|
||||
return ret ? latest_snapshot_info : SnapshotFileInfo{};
|
||||
return ret ? latest_snapshot_info : nullptr;
|
||||
};
|
||||
|
||||
if (keeper_context->getServerState() == KeeperContext::Phase::SHUTDOWN)
|
||||
@ -635,9 +641,9 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled.");
|
||||
auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false);
|
||||
|
||||
if (!snapshot_file_info.path.empty() && snapshot_manager_s3)
|
||||
if (snapshot_file_info && snapshot_manager_s3)
|
||||
{
|
||||
LOG_INFO(log, "Uploading snapshot {} during shutdown because 'upload_snapshot_on_exit' is enabled.", snapshot_file_info.path);
|
||||
LOG_INFO(log, "Uploading snapshot {} during shutdown because 'upload_snapshot_on_exit' is enabled.", snapshot_file_info->path);
|
||||
snapshot_manager_s3->uploadSnapshot(snapshot_file_info, /* asnyc_upload */ false);
|
||||
}
|
||||
|
||||
@ -672,7 +678,7 @@ void KeeperStateMachine::save_logical_snp_obj(
|
||||
latest_snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(data, s.get_last_log_idx());
|
||||
latest_snapshot_meta = cloned_meta;
|
||||
latest_snapshot_buf = std::move(cloned_buffer);
|
||||
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), latest_snapshot_info.path);
|
||||
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), latest_snapshot_info->path);
|
||||
obj_id++;
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSaveSnapshot);
|
||||
}
|
||||
@ -733,7 +739,7 @@ int KeeperStateMachine::read_logical_snp_obj(
|
||||
return -1;
|
||||
}
|
||||
|
||||
const auto & [path, disk] = latest_snapshot_info;
|
||||
const auto & [path, disk, size] = *latest_snapshot_info;
|
||||
if (isLocalDisk(*disk))
|
||||
{
|
||||
auto full_path = fs::path(disk->getPath()) / path;
|
||||
@ -862,12 +868,27 @@ uint64_t KeeperStateMachine::getKeyArenaSize() const
|
||||
return storage->getArenaDataSize();
|
||||
}
|
||||
|
||||
uint64_t KeeperStateMachine::getLatestSnapshotBufSize() const
|
||||
uint64_t KeeperStateMachine::getLatestSnapshotSize() const
|
||||
{
|
||||
std::lock_guard lock(snapshots_lock);
|
||||
if (latest_snapshot_buf)
|
||||
return latest_snapshot_buf->size();
|
||||
return 0;
|
||||
auto snapshot_info = [&]
|
||||
{
|
||||
std::lock_guard lock(snapshots_lock);
|
||||
return latest_snapshot_info;
|
||||
}();
|
||||
|
||||
if (snapshot_info == nullptr || snapshot_info->disk == nullptr)
|
||||
return 0;
|
||||
|
||||
/// there is a possibility multiple threads can try to get size
|
||||
/// this can happen in rare cases while it's not a heavy operation
|
||||
size_t size = snapshot_info->size.load(std::memory_order_relaxed);
|
||||
if (size == 0)
|
||||
{
|
||||
size = snapshot_info->disk->getFileSize(snapshot_info->path);
|
||||
snapshot_info->size.store(size, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
|
||||
|
@ -124,7 +124,7 @@ public:
|
||||
uint64_t getTotalEphemeralNodesCount() const;
|
||||
uint64_t getApproximateDataSize() const;
|
||||
uint64_t getKeyArenaSize() const;
|
||||
uint64_t getLatestSnapshotBufSize() const;
|
||||
uint64_t getLatestSnapshotSize() const;
|
||||
|
||||
void recalculateStorageStats();
|
||||
|
||||
@ -135,7 +135,7 @@ private:
|
||||
/// In our state machine we always have a single snapshot which is stored
|
||||
/// in memory in compressed (serialized) format.
|
||||
SnapshotMetadataPtr latest_snapshot_meta = nullptr;
|
||||
SnapshotFileInfo latest_snapshot_info;
|
||||
std::shared_ptr<SnapshotFileInfo> latest_snapshot_info;
|
||||
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
|
||||
|
||||
/// Main state machine logic
|
||||
|
@ -17,7 +17,6 @@ node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/enable_keeper.xml"],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True,
|
||||
)
|
||||
|
||||
|
||||
@ -211,3 +210,46 @@ def test_invalid_snapshot(started_cluster):
|
||||
node_zk.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def test_snapshot_size(started_cluster):
|
||||
keeper_utils.wait_until_connected(started_cluster, node)
|
||||
node_zk = None
|
||||
try:
|
||||
node_zk = get_connection_zk("node")
|
||||
|
||||
node_zk.create("/test_state_size", b"somevalue")
|
||||
strs = []
|
||||
for i in range(100):
|
||||
strs.append(random_string(123).encode())
|
||||
node_zk.create("/test_state_size/node" + str(i), strs[i])
|
||||
|
||||
node_zk.stop()
|
||||
node_zk.close()
|
||||
|
||||
keeper_utils.send_4lw_cmd(started_cluster, node, "csnp")
|
||||
node.wait_for_log_line("Created persistent snapshot")
|
||||
|
||||
def get_snapshot_size():
|
||||
return int(
|
||||
next(
|
||||
filter(
|
||||
lambda line: "zk_latest_snapshot_size" in line,
|
||||
keeper_utils.send_4lw_cmd(started_cluster, node, "mntr").split(
|
||||
"\n"
|
||||
),
|
||||
)
|
||||
).split("\t")[1]
|
||||
)
|
||||
|
||||
assert get_snapshot_size() != 0
|
||||
restart_clickhouse()
|
||||
assert get_snapshot_size() != 0
|
||||
finally:
|
||||
try:
|
||||
if node_zk is not None:
|
||||
node_zk.stop()
|
||||
node_zk.close()
|
||||
|
||||
except:
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user