Add initial version VFSLog with JSON serializer

This commit is contained in:
Aleksei Filatov 2024-07-24 12:45:21 +03:00
parent 74b870710f
commit 0c16dc0531
11 changed files with 394 additions and 127 deletions

View File

@ -13,7 +13,6 @@
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
//extern const int LOGICAL_ERROR;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CORRUPTED_DATA;
extern const int INVALID_STATE;
@ -117,6 +116,12 @@ AppendLog::AppendLog(const fs::path & log_dir_, const Settings & settings_) : lo
load();
}
UUID AppendLog::getID() const
{
std::lock_guard lock(mutex);
return log_id;
}
void AppendLog::load()
{

View File

@ -1,5 +1,7 @@
#pragma once
#include <IO/WriteBufferFromFile.h>
#include <Core/Types.h>
#include <filesystem>
#include <mutex>
@ -57,6 +59,8 @@ public:
UInt64 getStartIndex() const;
/// Return the number of segments the log contains
size_t segmentsCount() const;
/// Return unique log ID
UUID getID() const;
private:
/// Load all segments from the log directory and clean outdated segments
@ -83,6 +87,7 @@ private:
std::unique_ptr<WriteBufferFromFile> log_file TSA_GUARDED_BY(mutex);
size_t active_segment_size TSA_GUARDED_BY(mutex) = 0;
UUID log_id TSA_GUARDED_BY(mutex) {};
/// Index of the next entry to append
UInt64 next_index TSA_GUARDED_BY(mutex) = 0;
/// Log working directory

View File

@ -0,0 +1,117 @@
#pragma once
#include <Common/Exception.h>
#include <Disks/ObjectStorages/VFS/VFSLog.h>
#include <IO/ReadHelpers.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/Stringifier.h>
#include <variant>
#include <vector>
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
}
class JSONSerializer
{
public:
static std::vector<char> serialize(const VFSEvent & event)
{
Poco::JSON::Object object;
serializeToJSON(event, object);
String str = to_string(object);
return {str.begin(), str.end()};
}
static VFSEvent deserialize(std::span<char> buffer)
{
String str(buffer.data(), buffer.size());
auto object = from_string(std::move(str));
if (!object)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot parse VFS log item buffer as JSON");
VFSEvent event;
deserializeFromJSON(event, *object);
return event;
}
private:
template <typename Value>
static Value getOrThrow(const Poco::JSON::Object & object, const String & key)
{
if (!object.has(key))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key {} is not found in VFS log item", key);
return object.getValue<Value>(key);
}
static String to_string(const Poco::JSON::Object & object)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(object, oss);
return oss.str();
}
static Poco::JSON::Object::Ptr from_string(const String & json_str)
{
Poco::JSON::Parser parser;
return parser.parse(json_str).extract<Poco::JSON::Object::Ptr>();
}
static void serializeToJSON(const VFSEvent & event, Poco::JSON::Object & object)
{
object.set("remote_path", event.remote_path);
object.set("local_path", event.local_path);
object.set("action", static_cast<std::underlying_type_t<VFSAction>>(event.action));
object.set("timestamp", event.timestamp.epochMicroseconds());
if (event.orig_wal.has_value())
{
Poco::JSON::Object orig_wal;
serializeToJSON(*event.orig_wal, orig_wal);
object.set("orig_wal", orig_wal);
}
}
static void serializeToJSON(const WALInfo & orig_wal, Poco::JSON::Object & object)
{
object.set("index", orig_wal.index);
object.set("id", toString(orig_wal.id));
object.set("replica", orig_wal.replica);
}
static void deserializeFromJSON(VFSEvent & event, const Poco::JSON::Object & json)
{
event.remote_path = getOrThrow<String>(json, "remote_path");
event.local_path = getOrThrow<String>(json, "local_path");
event.action = static_cast<VFSAction>(getOrThrow<std::underlying_type_t<VFSAction>>(json, "action"));
event.timestamp = getOrThrow<UInt64>(json, "timestamp");
if (auto orig_wal = json.getObject("orig_wal"); orig_wal)
deserializeFromJSON(event.orig_wal.emplace(), *orig_wal);
}
static void deserializeFromJSON(WALInfo & orig_wal, const Poco::JSON::Object & json)
{
orig_wal.id = parseFromString<UUID>(getOrThrow<String>(json, "id"));
orig_wal.index = getOrThrow<UInt64>(json, "index");
orig_wal.replica = getOrThrow<String>(json, "replica");
}
};
}

View File

@ -35,13 +35,13 @@ extern const char vfs_gc_optimistic_lock_delay[];
VFSGarbageCollector::VFSGarbageCollector(
const String & gc_name_,
ObjectStoragePtr object_storage_,
WAL::AppendLog & alog_,
VFSLog & wal_,
BackgroundSchedulePool & pool,
const GarbageCollectorSettings & settings_)
: gc_name(gc_name_)
, object_storage(object_storage_)
, vfs_shapshot_data({object_storage, gc_name})
, alog(alog_)
, vfs_shapshot_data(object_storage, gc_name)
, wal(wal_)
, settings(settings_)
, log(getLogger(fmt::format("VFSGC({})", gc_name)))
{
@ -139,7 +139,7 @@ void VFSGarbageCollector::updateShapshotMetadata(const SnapshotMetadata & new_sn
void VFSGarbageCollector::updateSnapshot()
{
SnapshotMetadata snapshot_meta = getSnapshotMetadata();
auto [wal_items_batch, max_entry_index] = getWalItems(alog, settings.batch_size);
auto wal_items_batch = wal.read(settings.batch_size);
if (wal_items_batch.size() == 0ul)
{
LOG_DEBUG(log, "Merge snapshot exit due to empty wal.");
@ -147,10 +147,10 @@ void VFSGarbageCollector::updateSnapshot()
}
LOG_DEBUG(log, "Merge snapshot with {} entries from wal.", wal_items_batch.size());
auto new_snaphot_meta = vfs_shapshot_data.mergeWithWals(wal_items_batch, snapshot_meta);
auto new_snaphot_meta = vfs_shapshot_data.mergeWithWals(std::move(wal_items_batch), snapshot_meta);
updateShapshotMetadata(new_snaphot_meta, snapshot_meta.znode_version);
alog.dropUpTo(max_entry_index + 1);
wal.dropUpTo(wal_items_batch.back().wal.index + 1);
LOG_DEBUG(log, "Snapshot update finished with new shapshot key {}", new_snaphot_meta.object_storage_key);
}

View File

@ -34,7 +34,7 @@ public:
VFSGarbageCollector(
const String & gc_name_,
ObjectStoragePtr object_storage_,
WAL::AppendLog & alog,
VFSLog & wal_,
BackgroundSchedulePool & pool,
const GarbageCollectorSettings & settings_);
@ -54,7 +54,7 @@ private:
String gc_name;
ObjectStoragePtr object_storage;
VFSSnapshotDataFromObjectStorage vfs_shapshot_data;
WAL::AppendLog & alog;
VFSLog & wal;
const GarbageCollectorSettings settings;
LoggerPtr log;

View File

@ -0,0 +1,53 @@
#include "VFSLog.h"
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
#include <Common/Concepts.h>
namespace DB
{
VFSLog::VFSLog(const String & log_dir) : wal(log_dir) // TODO: read settings from config
{
}
UInt64 VFSLog::write(const VFSEvent & event)
{
return wal.append(JSONSerializer::serialize(event));
}
VFSLogItems VFSLog::read(size_t count) const
{
VFSLogItems vfs_items;
WAL::Entries wal_items = wal.readFront(count);
auto wal_id = wal.getID(); // TODO: It makes sense to return wal id from readFront
vfs_items.reserve(wal_items.size());
std::ranges::transform(
wal_items,
std::back_inserter(vfs_items),
[&wal_id](auto && wal_item) -> VFSLogItem
{
VFSLogItem item;
item.event = JSONSerializer::deserialize(wal_item.data);
item.wal.index = wal_item.index;
item.wal.id = wal_id;
return item;
});
return vfs_items;
}
size_t VFSLog::dropUpTo(UInt64 index)
{
return wal.dropUpTo(index);
}
size_t VFSLog::size() const
{
return wal.size();
}
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <Core/UUID.h>
#include <Disks/ObjectStorages/VFS/AppendLog.h>
#include <variant>
namespace DB
{
enum class VFSAction : uint8_t
{
LINK,
UNLINK,
REQUEST
};
struct WALInfo
{
String replica;
UUID id;
UInt64 index;
bool operator==(const WALInfo &) const = default;
};
struct VFSEvent
{
String remote_path;
String local_path;
std::optional<WALInfo> orig_wal;
Poco::Timestamp timestamp;
VFSAction action;
bool operator==(const VFSEvent &) const = default;
};
struct VFSLogItem
{
VFSEvent event;
WALInfo wal;
bool operator==(const VFSLogItem &) const = default;
};
using VFSLogItems = std::vector<VFSLogItem>;
class VFSLog
{
public:
VFSLog(const String & log_dir);
UInt64 write(const VFSEvent & event);
VFSLogItems read(size_t count) const;
size_t dropUpTo(UInt64 index);
size_t size() const;
private:
WAL::AppendLog wal;
};
using VFSLogPtr = std::shared_ptr<VFSLog>;
}

View File

@ -20,25 +20,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/// tmp start
std::pair<WALItems, UInt64> getWalItems(WAL::AppendLog & alog, size_t batch_size)
{
auto alog_entries = alog.readFront(batch_size);
WALItems result;
UInt64 max_index = 0;
result.reserve(alog_entries.size());
for (const auto & entry : alog_entries)
{
String data(entry.data.begin(), entry.data.end());
result.emplace_back(WALItem::deserialize(data));
max_index = std::max(max_index, entry.index);
}
return {result, max_index};
}
/// tmp end
bool VFSSnapshotEntry::operator==(const VFSSnapshotEntry & entry) const
{
return remote_path == entry.remote_path && link_count == entry.link_count;
@ -83,7 +64,7 @@ void VFSSnapshotDataBase::writeEntryInSnaphot(
}
SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(WALItems & wal_items, const SnapshotMetadata & old_snapshot_meta)
SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta)
{
/// For most of object stroges (like s3 or azure) we don't need the object path, it's generated randomly.
/// But other ones reqiested to set it manually.
@ -91,7 +72,7 @@ SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(WALItems & wal_items, const
auto [new_shapshot_write_buffer, new_object_key] = getShapshotWriteBufferAndSnaphotObject(old_snapshot_meta);
LOG_DEBUG(log, "Going to merge WAL batch(size {}) with snapshot (key {})", wal_items.size(), old_snapshot_meta.object_storage_key);
auto entires_to_remove = mergeWithWalsImpl(wal_items, *shapshot_read_buffer, *new_shapshot_write_buffer);
auto entires_to_remove = mergeWithWalsImpl(std::move(wal_items), *shapshot_read_buffer, *new_shapshot_write_buffer);
SnapshotMetadata new_snaphot_meta(new_object_key);
LOG_DEBUG(log, "Merge snapshot have finished, going to remove {} from object storage", entires_to_remove.size());
@ -100,14 +81,13 @@ SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(WALItems & wal_items, const
}
VFSSnapshotEntries VFSSnapshotDataBase::mergeWithWalsImpl(WALItems & wal_items, ReadBuffer & read_buffer, WriteBuffer & write_buffer)
VFSSnapshotEntries VFSSnapshotDataBase::mergeWithWalsImpl(VFSLogItems && wal_items_, ReadBuffer & read_buffer, WriteBuffer & write_buffer)
{
VFSSnapshotEntries entries_to_remove;
std::sort(
wal_items.begin(),
wal_items.end(),
[](const WALItem & left, const WALItem & right) { return left.remote_path < right.remote_path; });
auto wal_items = std::move(wal_items_);
std::ranges::sort(
wal_items, [](const VFSLogItem & left, const VFSLogItem & right) { return left.event.remote_path < right.event.remote_path; });
auto current_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
@ -119,10 +99,22 @@ VFSSnapshotEntries VFSSnapshotDataBase::mergeWithWalsImpl(WALItems & wal_items,
for (auto wal_iterator = wal_items.begin(); wal_iterator != wal_items.end();)
{
// Combine all wal items with the same remote_path into single one.
VFSSnapshotEntry entry_to_merge = {wal_iterator->remote_path, 0};
while (wal_iterator != wal_items.end() && wal_iterator->remote_path == entry_to_merge.remote_path)
VFSSnapshotEntry entry_to_merge = {wal_iterator->event.remote_path, 0};
while (wal_iterator != wal_items.end() && wal_iterator->event.remote_path == entry_to_merge.remote_path)
{
entry_to_merge.link_count += wal_iterator->delta_link_count;
int delta_link_count = 0; // TODO: remove delta and save local_path in the snapshot
switch (wal_iterator->event.action)
{
case VFSAction::LINK:
case VFSAction::REQUEST:
delta_link_count = 1;
break;
case VFSAction::UNLINK:
delta_link_count = -1;
break;
}
entry_to_merge.link_count += delta_link_count;
++wal_iterator;
}

View File

@ -3,6 +3,7 @@
#include "AppendLog.h"
#include "IO/ReadHelpers.h"
#include "VFSShapshotMetadata.h"
#include "VFSLog.h"
#include <fstream>
#include <optional>
@ -15,39 +16,6 @@
namespace DB
{
/////// START OF TMP
struct WALItem
{
String remote_path;
Int64 delta_link_count;
String status;
time_t last_update_timestamp;
UInt64 last_update_wal_pointer;
UInt64 wal_uuid;
WALItem(String path, Int64 delta)
: remote_path(path), delta_link_count(delta), status(""), last_update_timestamp(0), last_update_wal_pointer(0), wal_uuid(0)
{
}
WALItem() : remote_path(""), delta_link_count(0), status(""), last_update_timestamp(0), last_update_wal_pointer(0), wal_uuid(0) { }
static WALItem deserialize(const String & str)
{
ReadBufferFromString rb(str);
WALItem item;
readStringUntilWhitespace(item.remote_path, rb);
checkChar(' ', rb);
readIntTextUnsafe(item.delta_link_count, rb);
return item;
}
String serialize() const { return fmt::format("{} {}", remote_path, delta_link_count); }
};
using WALItems = std::vector<WALItem>;
std::pair<WALItems, UInt64> getWalItems(WAL::AppendLog & alog, size_t batch_size);
/////// END OF TMP
struct VFSSnapshotEntry
{
String remote_path;
@ -66,12 +34,15 @@ public:
VFSSnapshotDataBase() : log(getLogger("VFSSnapshotDataBase(unnamed)")) { }
virtual ~VFSSnapshotDataBase() = default;
VFSSnapshotDataBase(const VFSSnapshotDataBase &) = delete;
VFSSnapshotDataBase(VFSSnapshotDataBase &&) = delete;
/// Apply wal on the current snapshot and returns metadata of the updated one.
SnapshotMetadata mergeWithWals(WALItems & wal_items, const SnapshotMetadata & old_snapshot_meta);
SnapshotMetadata mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta);
private:
void writeEntryInSnaphot(const VFSSnapshotEntry & entry, WriteBuffer & write_buffer, VFSSnapshotEntries & entries_to_remove);
VFSSnapshotEntries mergeWithWalsImpl(WALItems & wal_items, ReadBuffer & read_buffer, WriteBuffer & write_buffer);
VFSSnapshotEntries mergeWithWalsImpl(VFSLogItems && wal_items, ReadBuffer & read_buffer, WriteBuffer & write_buffer);
LoggerPtr log;
@ -86,7 +57,6 @@ protected:
class VFSSnapshotDataFromObjectStorage : public VFSSnapshotDataBase
{
public:
VFSSnapshotDataFromObjectStorage() = delete;
VFSSnapshotDataFromObjectStorage(ObjectStoragePtr object_storage_, const String & name_)
: object_storage(object_storage_), log(getLogger(fmt::format("VFSSnapshotDataFromObjectStorage({})", name_)))
{
@ -100,6 +70,7 @@ protected:
private:
ObjectStoragePtr object_storage;
LoggerPtr log;
};
}

View File

@ -74,78 +74,77 @@ TEST(DiskObjectStorageVFS, SnaphotEntriesSerialization)
EXPECT_EQ(deserialized, entries);
}
TEST(DiskObjectStorageVFS, ExceptionOnUnsortedSnahot)
TEST(DiskObjectStorageVFS, ExceptionOnUnsortedSnapshot)
{
VFSSnapshotEntries entries = {{"/b", 1}, {"/a", 1}, {"/c", 2}};
VFSSnapshotDataFromString snapshot_data;
snapshot_data.setSnaphotData(convertSnaphotEntiesToString(entries));
SnapshotMetadata placeholder_metadata;
WALItems empty_wals;
EXPECT_THROW(auto new_snapshot = snapshot_data.mergeWithWals(empty_wals, placeholder_metadata), Exception) << "Snaphot is unsorted";
EXPECT_THROW(auto new_snapshot = snapshot_data.mergeWithWals({}, placeholder_metadata), Exception) << "Snaphot is unsorted";
}
TEST(DiskObjectStorageVFS, MergeWalWithSnaphot)
{
WALItems wals = {{"/c", 1}, {"/d", 2}, {"/b", 1}};
// TEST(DiskObjectStorageVFS, MergeWalWithSnaphot)
// {
// WALItems wals = {{"/c", 1}, {"/d", 2}, {"/b", 1}};
String expected_snaphot_state;
VFSSnapshotDataFromString snapshot_data;
SnapshotMetadata placeholder_metadata;
{
snapshot_data.setSnaphotData("");
snapshot_data.mergeWithWals(wals, placeholder_metadata);
// String expected_snaphot_state;
// VFSSnapshotDataFromString snapshot_data;
// SnapshotMetadata placeholder_metadata;
// {
// snapshot_data.setSnaphotData("");
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
VFSSnapshotEntries expected_entries_to_remove = {};
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {};
EXPECT_EQ(expected_entries_to_remove, to_remove);
}
expected_snaphot_state = "/b 1\n/c 1\n/d 2\n";
EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/b 1\n/c 1\n/d 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
wals = {{"/c", -1}, {"/d", -1}, {"/d", -1}, {"/e", -1}, {"/e", 1}, {"/f", 1}, {"/f", 1}, {"/f", -1}, {"/f", 1}};
{
snapshot_data.setSnaphotData(expected_snaphot_state);
snapshot_data.mergeWithWals(wals, placeholder_metadata);
// wals = {{"/c", -1}, {"/d", -1}, {"/d", -1}, {"/e", -1}, {"/e", 1}, {"/f", 1}, {"/f", 1}, {"/f", -1}, {"/f", 1}};
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
VFSSnapshotEntries expected_entries_to_remove = {{"/c", 0}, {"/d", 0}, {"/e", 0}};
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {{"/c", 0}, {"/d", 0}, {"/e", 0}};
EXPECT_EQ(expected_entries_to_remove, to_remove);
}
expected_snaphot_state = "/b 1\n/f 2\n";
EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/b 1\n/f 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
wals = {
{"/a", 1},
{"/b", 1},
{"/c", 1},
};
// wals = {
// {"/a", 1},
// {"/b", 1},
// {"/c", 1},
// };
{
snapshot_data.setSnaphotData(expected_snaphot_state);
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
snapshot_data.mergeWithWals(wals, placeholder_metadata);
auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
VFSSnapshotEntries expected_entries_to_remove;
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove;
EXPECT_EQ(expected_entries_to_remove, to_remove);
}
expected_snaphot_state = "/a 1\n/b 2\n/c 1\n/f 2\n";
EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/a 1\n/b 2\n/c 1\n/f 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
wals = {{"/a", -1}, {"/b", -2}, {"/c", -1}, {"/f", -2}};
{
snapshot_data.setSnaphotData(expected_snaphot_state);
snapshot_data.mergeWithWals(wals, placeholder_metadata);
// wals = {{"/a", -1}, {"/b", -2}, {"/c", -1}, {"/f", -2}};
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
VFSSnapshotEntries expected_entries_to_remove = {{"/a", 0}, {"/b", 0}, {"/c", 0}, {"/f", 0}};
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {{"/a", 0}, {"/b", 0}, {"/c", 0}, {"/f", 0}};
EXPECT_EQ(expected_entries_to_remove, to_remove);
}
expected_snaphot_state = "";
EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
}
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// }

View File

@ -0,0 +1,60 @@
#include <gtest/gtest.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
#include <Disks/ObjectStorages/VFS/VFSLog.h>
#include <Poco/Checksum.h>
#include <filesystem>
using namespace DB;
namespace fs = std::filesystem;
const fs::path log_dir = "tmp/";
const String remote_path = "/cloud_storage/remote/path";
const String local_path = "/table/path/to/part/file";
class VFSLogTest : public ::testing::Test
{
protected:
void SetUp() override { fs::remove_all(log_dir); }
};
TEST_F(VFSLogTest, Serialization)
{
{
VFSEvent event_in{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
auto buf = JSONSerializer::serialize(event_in);
VFSEvent event_out = JSONSerializer::deserialize(buf);
EXPECT_EQ(event_in, event_out);
}
{
VFSEvent event_in{remote_path, local_path, WALInfo{"src_replica", UUIDHelpers::generateV4(), 1}, Poco::Timestamp(), VFSAction::REQUEST};
auto buf = JSONSerializer::serialize(event_in);
VFSEvent event_out = JSONSerializer::deserialize(buf);
EXPECT_EQ(event_in, event_out);
}
}
TEST_F(VFSLogTest, Main)
{
DB::VFSLog vfs_log(log_dir);
VFSEvent event{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
vfs_log.write(event);
EXPECT_EQ(vfs_log.size(), 1);
VFSLogItems items = vfs_log.read(1);
EXPECT_EQ(items.size(), 1);
EXPECT_EQ(items[0].wal.index, 0);
vfs_log.dropUpTo(1);
EXPECT_EQ(vfs_log.size(), 0);
}