Merge pull request #70915 from jkartseva/reduce-api-calls-cost

Cache HEAD API requests to object storage in the plain_rewritable disk
This commit is contained in:
Julia Kartseva 2024-10-24 23:58:13 +00:00 committed by GitHub
commit 2e61f2bb99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 236 additions and 99 deletions

View File

@ -1,5 +1,5 @@
#include <Disks/ObjectStorages/CommonPathPrefixKeyGenerator.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Common/SharedLockGuard.h>
#include <Common/getRandomASCIIString.h>
@ -11,7 +11,7 @@
namespace DB
{
CommonPathPrefixKeyGenerator::CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_)
CommonPathPrefixKeyGenerator::CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_)
: storage_key_prefix(key_prefix_), path_map(std::move(path_map_))
{
}
@ -59,7 +59,7 @@ std::tuple<std::string, std::vector<std::string>> CommonPathPrefixKeyGenerator::
if (it != ptr->map.end())
{
std::vector<std::string> vec(std::make_move_iterator(dq.begin()), std::make_move_iterator(dq.end()));
return std::make_tuple(it->second, std::move(vec));
return std::make_tuple(it->second.path, std::move(vec));
}
if (!p.filename().empty())

View File

@ -20,13 +20,13 @@ namespace DB
/// The key generator ensures that the original directory hierarchy is
/// preserved, which is required for the MergeTree family.
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
class CommonPathPrefixKeyGenerator : public IObjectStorageKeysGenerator
{
public:
/// Local to remote path map. Leverages filesystem::path comparator for paths.
explicit CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_);
explicit CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_);
ObjectStorageKey generate(const String & path, bool is_directory, const std::optional<String> & key_prefix) const override;
@ -36,7 +36,7 @@ private:
const String storage_key_prefix;
std::weak_ptr<InMemoryPathMap> path_map;
std::weak_ptr<InMemoryDirectoryPathMap> path_map;
};
}

View File

@ -1,5 +1,5 @@
#include "FlatDirectoryStructureKeyGenerator.h"
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include "Common/ObjectStorageKey.h"
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
@ -12,7 +12,8 @@
namespace DB
{
FlatDirectoryStructureKeyGenerator::FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_)
FlatDirectoryStructureKeyGenerator::FlatDirectoryStructureKeyGenerator(
String storage_key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_)
: storage_key_prefix(storage_key_prefix_), path_map(std::move(path_map_))
{
}
@ -31,11 +32,11 @@ ObjectStorageKey FlatDirectoryStructureKeyGenerator::generate(const String & pat
SharedLockGuard lock(ptr->mutex);
auto it = ptr->map.find(p);
if (it != ptr->map.end())
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, it->second);
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, it->second.path);
it = ptr->map.find(directory);
if (it != ptr->map.end())
remote_path = it->second;
remote_path = it->second.path;
}
constexpr size_t part_size = 32;
std::filesystem::path key = remote_path.has_value() ? *remote_path

View File

@ -6,18 +6,18 @@
namespace DB
{
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
class FlatDirectoryStructureKeyGenerator : public IObjectStorageKeysGenerator
{
public:
explicit FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_);
explicit FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_);
ObjectStorageKey generate(const String & path, bool is_directory, const std::optional<String> & key_prefix) const override;
private:
const String storage_key_prefix;
std::weak_ptr<InMemoryPathMap> path_map;
std::weak_ptr<InMemoryDirectoryPathMap> path_map;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <memory>
#include <optional>
#include <vector>
#include <unordered_map>
#include <Poco/Timestamp.h>
@ -197,6 +198,13 @@ public:
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
virtual std::optional<Poco::Timestamp> getLastModifiedIfExists(const std::string & path) const
{
if (existsFileOrDirectory(path))
return getLastModified(path);
return std::nullopt;
}
virtual time_t getLastChanged(const std::string & /* path */) const
{
throwNotImplemented();

View File

@ -2,14 +2,17 @@
#include <filesystem>
#include <map>
#include <optional>
#include <shared_mutex>
#include <base/defines.h>
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
namespace DB
{
struct InMemoryPathMap
struct InMemoryDirectoryPathMap
{
struct PathComparator
{
@ -22,8 +25,27 @@ struct InMemoryPathMap
return path1 < path2;
}
};
/// Local -> Remote path.
using Map = std::map<std::filesystem::path, std::string, PathComparator>;
struct RemotePathInfo
{
std::string path;
time_t last_modified = 0;
};
using Map = std::map<std::filesystem::path, RemotePathInfo, PathComparator>;
std::optional<RemotePathInfo> getRemotePathInfoIfExists(const std::string & path)
{
auto base_path = path;
if (base_path.ends_with('/'))
base_path.pop_back();
SharedLockGuard lock(mutex);
auto it = map.find(base_path);
if (it == map.end())
return std::nullopt;
return it->second;
}
mutable SharedMutex mutex;
#ifdef OS_LINUX

View File

@ -116,7 +116,8 @@ void registerPlainMetadataStorage(MetadataStorageFactory & factory)
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromPlainObjectStorage>(object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".file_sizes_cache_size", 0));
return std::make_shared<MetadataStorageFromPlainObjectStorage>(
object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".object_metadata_cache_size", 0));
});
}
@ -130,7 +131,8 @@ void registerPlainRewritableMetadataStorage(MetadataStorageFactory & factory)
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorage>(object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".file_sizes_cache_size", 0));
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorage>(
object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".object_metadata_cache_size", 0));
});
}

View File

@ -1,15 +1,22 @@
#include "MetadataStorageFromPlainObjectStorage.h"
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.h>
#include <Disks/ObjectStorages/StaticDirectoryIterator.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Common/ObjectStorageKey.h>
#include <Common/SipHash.h>
#include <Common/filesystemHelpers.h>
#include <filesystem>
#include <memory>
#include <optional>
#include <tuple>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
@ -30,12 +37,12 @@ std::filesystem::path normalizeDirectoryPath(const std::filesystem::path & path)
}
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t file_sizes_cache_size)
: object_storage(object_storage_)
, storage_path_prefix(std::move(storage_path_prefix_))
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size)
: object_storage(object_storage_), storage_path_prefix(std::move(storage_path_prefix_))
{
if (file_sizes_cache_size)
file_sizes_cache.emplace(file_sizes_cache_size);
if (object_metadata_cache_size)
object_metadata_cache.emplace(object_metadata_cache_size);
}
MetadataTransactionPtr MetadataStorageFromPlainObjectStorage::createTransaction()
@ -82,28 +89,29 @@ uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path)
{
if (auto res = getFileSizeIfExists(path))
return *res;
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} does not exist on plain object storage", path);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} does not exist on {}", path, object_storage->getName());
}
std::optional<uint64_t> MetadataStorageFromPlainObjectStorage::getFileSizeIfExists(const String & path) const
{
auto get = [&] -> std::shared_ptr<uint64_t>
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto metadata = object_storage->tryGetObjectMetadata(object_key.serialize());
if (metadata)
return std::make_shared<uint64_t>(metadata->size_bytes);
return nullptr;
};
if (auto res = getObjectMetadataEntryWithCache(path))
return res->file_size;
return std::nullopt;
}
std::shared_ptr<uint64_t> res;
if (file_sizes_cache)
res = file_sizes_cache->getOrSet(path, get).first;
else
res = get();
if (res)
Poco::Timestamp MetadataStorageFromPlainObjectStorage::getLastModified(const std::string & path) const
{
if (auto res = getLastModifiedIfExists(path))
return *res;
else
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File or directory {} does not exist on {}", path, object_storage->getName());
}
std::optional<Poco::Timestamp> MetadataStorageFromPlainObjectStorage::getLastModifiedIfExists(const std::string & path) const
{
/// Since the plain object storage is used for backups only, return the current time.
if (existsFileOrDirectory(path))
return Poco::Timestamp{};
return std::nullopt;
}
@ -161,6 +169,31 @@ std::optional<StoredObjects> MetadataStorageFromPlainObjectStorage::getStorageOb
return std::nullopt;
}
MetadataStorageFromPlainObjectStorage::ObjectMetadataEntryPtr
MetadataStorageFromPlainObjectStorage::getObjectMetadataEntryWithCache(const std::string & path) const
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto get = [&] -> ObjectMetadataEntryPtr
{
if (auto metadata = object_storage->tryGetObjectMetadata(object_key.serialize()))
return std::make_shared<ObjectMetadataEntry>(metadata->size_bytes, metadata->last_modified.epochTime());
return nullptr;
};
if (object_metadata_cache)
{
SipHash hash;
hash.update(object_key.serialize());
auto hash128 = hash.get128();
if (auto res = object_metadata_cache->get(hash128))
return res;
if (auto mapped = get())
return object_metadata_cache->getOrSet(hash128, [&] { return mapped; }).first;
return object_metadata_cache->get(hash128);
}
return get();
}
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
{
return metadata_storage;
@ -225,8 +258,17 @@ void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
/// Noop, local metadata files is only one file, it is the metadata file itself.
}
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string &)
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string & path)
{
/// The record has become stale, remove it from cache.
if (metadata_storage.object_metadata_cache)
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
SipHash hash;
hash.update(object_key.serialize());
metadata_storage.object_metadata_cache->remove(hash.get128());
}
/// No hardlinks, so will always remove file.
return std::make_shared<UnlinkMetadataFileOperationOutcome>(UnlinkMetadataFileOperationOutcome{0});
}

View File

@ -1,21 +1,24 @@
#pragma once
#include <Core/Types.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
#include <Common/CacheBase.h>
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
{
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
struct UnlinkMetadataFileOperationOutcome;
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
@ -33,16 +36,24 @@ class MetadataStorageFromPlainObjectStorage : public IMetadataStorage
{
private:
friend class MetadataStorageFromPlainObjectStorageTransaction;
mutable std::optional<CacheBase<String, uint64_t>> file_sizes_cache;
protected:
struct ObjectMetadataEntry
{
uint64_t file_size;
time_t last_modified;
};
using ObjectMetadataEntryPtr = std::shared_ptr<ObjectMetadataEntry>;
ObjectStoragePtr object_storage;
String storage_path_prefix;
const String storage_path_prefix;
mutable std::optional<CacheBase<UInt128, ObjectMetadataEntry>> object_metadata_cache;
mutable SharedMutex metadata_mutex;
public:
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t file_sizes_cache_size);
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size);
MetadataTransactionPtr createTransaction() override;
@ -66,11 +77,8 @@ public:
StoredObjects getStorageObjects(const std::string & path) const override;
std::optional<StoredObjects> getStorageObjectsIfExist(const std::string & path) const override;
Poco::Timestamp getLastModified(const std::string & /* path */) const override
{
/// Required by MergeTree
return {};
}
Poco::Timestamp getLastModified(const std::string & path) const override;
std::optional<Poco::Timestamp> getLastModifiedIfExists(const String & path) const override;
uint32_t getHardlinkCount(const std::string & /* path */) const override
{
@ -85,7 +93,9 @@ protected:
virtual std::string getMetadataKeyPrefix() const { return object_storage->getCommonKeyPrefix(); }
/// Returns a map of virtual filesystem paths to paths in the object storage.
virtual std::shared_ptr<InMemoryPathMap> getPathMap() const { throwNotImplemented(); }
virtual std::shared_ptr<InMemoryDirectoryPathMap> getPathMap() const { throwNotImplemented(); }
ObjectMetadataEntryPtr getObjectMetadataEntryWithCache(const std::string & path) const;
};
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction, private MetadataOperationsHolder

View File

@ -1,8 +1,9 @@
#include "MetadataStorageFromPlainObjectStorageOperations.h"
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/Timestamp.h>
#include <Common/Exception.h>
#include <Common/SharedLockGuard.h>
#include <Common/logger_useful.h>
@ -30,7 +31,10 @@ ObjectStorageKey createMetadataObjectKey(const std::string & object_key_prefix,
}
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
std::filesystem::path && path_, InMemoryPathMap & path_map_, ObjectStoragePtr object_storage_, const std::string & metadata_key_prefix_)
std::filesystem::path && path_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path(std::move(path_))
, path_map(path_map_)
, object_storage(object_storage_)
@ -71,7 +75,8 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::execute(std:
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
[[maybe_unused]] auto result = map.emplace(base_path, object_key_prefix);
[[maybe_unused]] auto result
= map.emplace(base_path, InMemoryDirectoryPathMap::RemotePathInfo{object_key_prefix, Poco::Timestamp{}.epochTime()});
chassert(result.second);
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
@ -109,7 +114,7 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::un
MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::MetadataStorageFromPlainObjectStorageMoveDirectoryOperation(
std::filesystem::path && path_from_,
std::filesystem::path && path_to_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path_from(std::move(path_from_))
@ -139,7 +144,7 @@ std::unique_ptr<WriteBufferFromFileBase> MetadataStorageFromPlainObjectStorageMo
throw Exception(
ErrorCodes::FILE_ALREADY_EXISTS, "Metadata object for the new (destination) path '{}' already exists", new_path);
remote_path = expected_it->second;
remote_path = expected_it->second.path;
}
auto metadata_object_key = createMetadataObjectKey(remote_path, metadata_key_prefix);
@ -190,6 +195,7 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::execute(std::u
auto & map = path_map.map;
[[maybe_unused]] auto result = map.emplace(base_path_to, map.extract(base_path_from).mapped());
chassert(result.second);
result.first->second.last_modified = Poco::Timestamp{}.epochTime();
}
write_finalized = true;
@ -213,7 +219,10 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::undo(std::uniq
}
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
std::filesystem::path && path_, InMemoryPathMap & path_map_, ObjectStoragePtr object_storage_, const std::string & metadata_key_prefix_)
std::filesystem::path && path_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path(std::move(path_)), path_map(path_map_), object_storage(object_storage_), metadata_key_prefix(metadata_key_prefix_)
{
chassert(path.string().ends_with('/'));
@ -229,7 +238,7 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std:
auto path_it = map.find(base_path);
if (path_it == map.end())
return;
key_prefix = path_it->second;
key_prefix = path_it->second.path;
}
LOG_TRACE(getLogger("MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation"), "Removing directory '{}'", path);

View File

@ -1,7 +1,7 @@
#pragma once
#include <Disks/ObjectStorages/IMetadataOperation.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
#include <filesystem>
@ -14,7 +14,7 @@ class MetadataStorageFromPlainObjectStorageCreateDirectoryOperation final : publ
{
private:
std::filesystem::path path;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
const std::string object_key_prefix;
@ -26,7 +26,7 @@ public:
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
/// path_ must end with a trailing '/'.
std::filesystem::path && path_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);
@ -39,7 +39,7 @@ class MetadataStorageFromPlainObjectStorageMoveDirectoryOperation final : public
private:
std::filesystem::path path_from;
std::filesystem::path path_to;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
@ -54,7 +54,7 @@ public:
/// Both path_from_ and path_to_ must end with a trailing '/'.
std::filesystem::path && path_from_,
std::filesystem::path && path_to_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);
@ -68,7 +68,7 @@ class MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation final : publ
private:
std::filesystem::path path;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
@ -79,7 +79,7 @@ public:
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
/// path_ must end with a trailing '/'.
std::filesystem::path && path_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);

View File

@ -1,12 +1,17 @@
#include <Disks/ObjectStorages/FlatDirectoryStructureKeyGenerator.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainRewritableObjectStorage.h>
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <cstddef>
#include <exception>
#include <optional>
#include <unordered_set>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/SharedThreadPools.h>
#include <Poco/Timestamp.h>
#include "Common/Exception.h"
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
#include <Common/logger_useful.h>
@ -40,10 +45,10 @@ std::string getMetadataKeyPrefix(ObjectStoragePtr object_storage)
: metadata_key_prefix;
}
std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_key_prefix, ObjectStoragePtr object_storage)
std::shared_ptr<InMemoryDirectoryPathMap> loadPathPrefixMap(const std::string & metadata_key_prefix, ObjectStoragePtr object_storage)
{
auto result = std::make_shared<InMemoryPathMap>();
using Map = InMemoryPathMap::Map;
auto result = std::make_shared<InMemoryDirectoryPathMap>();
using Map = InMemoryDirectoryPathMap::Map;
ThreadPool & pool = getIOThreadPool().get();
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PlainRWMetaLoad");
@ -73,17 +78,24 @@ std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_
StoredObject object{path};
String local_path;
Poco::Timestamp last_modified{};
try
{
auto read_buf = object_storage->readObject(object, settings);
readStringUntilEOF(local_path, *read_buf);
auto object_metadata = object_storage->tryGetObjectMetadata(path);
/// It ok if a directory was removed just now.
/// We support attaching a filesystem that is concurrently modified by someone else.
if (!object_metadata)
return;
/// Assuming that local and the object storage clocks are synchronized.
last_modified = object_metadata->last_modified;
}
#if USE_AWS_S3
catch (const S3Exception & e)
{
/// It is ok if a directory was removed just now.
/// We support attaching a filesystem that is concurrently modified by someone else.
if (e.getS3ErrorCode() == Aws::S3::S3Errors::NO_SUCH_KEY)
return;
throw;
@ -101,18 +113,19 @@ std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_
std::pair<Map::iterator, bool> res;
{
std::lock_guard lock(result->mutex);
res = result->map.emplace(std::filesystem::path(local_path).parent_path(), remote_path.parent_path());
res = result->map.emplace(
std::filesystem::path(local_path).parent_path(),
InMemoryDirectoryPathMap::RemotePathInfo{remote_path.parent_path(), last_modified.epochTime()});
}
/// This can happen if table replication is enabled, then the same local path is written
/// in `prefix.path` of each replica.
/// TODO: should replicated tables (e.g., RMT) be explicitly disallowed?
if (!res.second)
LOG_WARNING(
log,
"The local path '{}' is already mapped to a remote path '{}', ignoring: '{}'",
local_path,
res.first->second,
res.first->second.path,
remote_path.parent_path().string());
});
}
@ -132,7 +145,7 @@ void getDirectChildrenOnDiskImpl(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,
const std::string & local_path,
const InMemoryPathMap & path_map,
const InMemoryDirectoryPathMap & path_map,
std::unordered_set<std::string> & result)
{
/// Directories are retrieved from the in-memory path map.
@ -180,8 +193,8 @@ void getDirectChildrenOnDiskImpl(
}
MetadataStorageFromPlainRewritableObjectStorage::MetadataStorageFromPlainRewritableObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t file_sizes_cache_size)
: MetadataStorageFromPlainObjectStorage(object_storage_, storage_path_prefix_, file_sizes_cache_size)
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size)
: MetadataStorageFromPlainObjectStorage(object_storage_, storage_path_prefix_, object_metadata_cache_size)
, metadata_key_prefix(DB::getMetadataKeyPrefix(object_storage))
, path_map(loadPathPrefixMap(metadata_key_prefix, object_storage))
{
@ -215,9 +228,7 @@ bool MetadataStorageFromPlainRewritableObjectStorage::existsFileOrDirectory(cons
if (existsDirectory(path))
return true;
ObjectStorageKey object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
StoredObject object(object_key.serialize(), path);
return object_storage->exists(object);
return getObjectMetadataEntryWithCache(path) != nullptr;
}
bool MetadataStorageFromPlainRewritableObjectStorage::existsFile(const std::string & path) const
@ -225,19 +236,12 @@ bool MetadataStorageFromPlainRewritableObjectStorage::existsFile(const std::stri
if (existsDirectory(path))
return false;
ObjectStorageKey object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
StoredObject object(object_key.serialize(), path);
return object_storage->exists(object);
return getObjectMetadataEntryWithCache(path) != nullptr;
}
bool MetadataStorageFromPlainRewritableObjectStorage::existsDirectory(const std::string & path) const
{
auto base_path = path;
if (base_path.ends_with('/'))
base_path.pop_back();
SharedLockGuard lock(path_map->mutex);
return path_map->map.find(base_path) != path_map->map.end();
return path_map->getRemotePathInfoIfExists(path) != std::nullopt;
}
std::vector<std::string> MetadataStorageFromPlainRewritableObjectStorage::listDirectory(const std::string & path) const
@ -255,6 +259,18 @@ std::vector<std::string> MetadataStorageFromPlainRewritableObjectStorage::listDi
return std::vector<std::string>(std::make_move_iterator(directories.begin()), std::make_move_iterator(directories.end()));
}
std::optional<Poco::Timestamp> MetadataStorageFromPlainRewritableObjectStorage::getLastModifiedIfExists(const String & path) const
{
/// Path corresponds to a directory.
if (auto remote = path_map->getRemotePathInfoIfExists(path))
return Poco::Timestamp::fromEpochTime(remote->last_modified);
/// A file.
if (auto res = getObjectMetadataEntryWithCache(path))
return Poco::Timestamp::fromEpochTime(res->last_modified);
return std::nullopt;
}
void MetadataStorageFromPlainRewritableObjectStorage::getDirectChildrenOnDisk(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,

View File

@ -13,21 +13,28 @@ class MetadataStorageFromPlainRewritableObjectStorage final : public MetadataSto
{
private:
const std::string metadata_key_prefix;
std::shared_ptr<InMemoryPathMap> path_map;
std::shared_ptr<InMemoryDirectoryPathMap> path_map;
public:
MetadataStorageFromPlainRewritableObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t file_sizes_cache_size);
MetadataStorageFromPlainRewritableObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size);
~MetadataStorageFromPlainRewritableObjectStorage() override;
MetadataStorageType getType() const override { return MetadataStorageType::PlainRewritable; }
bool existsFile(const std::string & path) const override;
bool existsDirectory(const std::string & path) const override;
bool existsFileOrDirectory(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
std::optional<Poco::Timestamp> getLastModifiedIfExists(const String & path) const override;
protected:
std::string getMetadataKeyPrefix() const override { return metadata_key_prefix; }
std::shared_ptr<InMemoryPathMap> getPathMap() const override { return path_map; }
std::shared_ptr<InMemoryDirectoryPathMap> getPathMap() const override { return path_map; }
void getDirectChildrenOnDisk(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,

View File

@ -2,7 +2,9 @@
<storage_configuration>
<disks>
<disk_s3_plain_rewritable>
<type>s3_plain_rewritable</type>
<type>object_storage</type>
<object_storage_type>s3</object_storage_type>
<metadata_type>plain_rewritable</metadata_type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<access_key_id>minio</access_key_id>
@ -15,6 +17,16 @@
<max_size>1000000000</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</disk_cache_s3_plain_rewritable>
<disk_s3_plain_rewritable_with_metadata_cache>
<type>object_storage</type>
<object_storage_type>s3</object_storage_type>
<metadata_type>plain_rewritable</metadata_type>
<endpoint>http://minio1:9001/root/data_with_cache/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<object_metadata_cache_size>1000</object_metadata_cache_size>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3_plain_rewritable_with_metadata_cache>
</disks>
<policies>
<s3_plain_rewritable>
@ -31,6 +43,13 @@
</main>
</volumes>
</cache_s3_plain_rewritable>
<s3_plain_rewritable_with_metadata_cache>
<volumes>
<main>
<disk>disk_s3_plain_rewritable_with_metadata_cache</disk>
</main>
</volumes>
</s3_plain_rewritable_with_metadata_cache>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -45,13 +45,14 @@ def start_cluster():
@pytest.mark.parametrize(
"storage_policy",
"storage_policy,key_prefix",
[
pytest.param("s3_plain_rewritable"),
pytest.param("cache_s3_plain_rewritable"),
pytest.param("s3_plain_rewritable", "data/"),
pytest.param("cache_s3_plain_rewritable", "data/"),
pytest.param("s3_plain_rewritable_with_metadata_cache", "data_with_cache/"),
],
)
def test(storage_policy):
def test(storage_policy, key_prefix):
def create_insert(node, insert_values):
node.query(
"""
@ -141,7 +142,7 @@ def test(storage_policy):
)
metadata_it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True
cluster.minio_bucket, key_prefix, recursive=True
)
metadata_count = 0
for obj in list(metadata_it):
@ -158,7 +159,7 @@ def test(storage_policy):
node.query("DROP TABLE IF EXISTS test SYNC")
it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True
cluster.minio_bucket, key_prefix, recursive=True
)
assert len(list(it)) == 0