mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Review fixes
This commit is contained in:
parent
bf1c99daab
commit
4c88527c8e
@ -242,7 +242,7 @@ int getINodeNumberFromPath(const String & path)
|
||||
return file_stat.st_ino;
|
||||
}
|
||||
|
||||
std::optional<size_t> getFileSizeIfPossible(const String & path)
|
||||
std::optional<size_t> tryGetSizeFromFilePath(const String & path)
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
|
@ -66,9 +66,12 @@ bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path
|
||||
|
||||
size_t getSizeFromFileDescriptor(int fd, const String & file_name = "");
|
||||
|
||||
std::optional<size_t> tryGetSizeFromFilePath(const String & path);
|
||||
|
||||
/// Get inode number for a file path.
|
||||
/// Will not work correctly if filesystem does not support inodes.
|
||||
int getINodeNumberFromPath(const String & path);
|
||||
|
||||
std::optional<size_t> getFileSizeIfPossible(const String & path);
|
||||
}
|
||||
|
||||
namespace FS
|
||||
|
@ -236,9 +236,9 @@ void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & c
|
||||
delegate->applyNewSettings(config, context, config_prefix, map);
|
||||
}
|
||||
|
||||
DiskObjectStoragePtr DiskDecorator::getObjectStorage(const String & name)
|
||||
DiskObjectStoragePtr DiskDecorator::createDiskObjectStorage(const String & name)
|
||||
{
|
||||
return delegate->getObjectStorage(name);
|
||||
return delegate->createDiskObjectStorage(name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -81,11 +81,11 @@ public:
|
||||
void startup(ContextPtr context) override;
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
|
||||
|
||||
bool isCached() const override { return delegate->isCached(); }
|
||||
bool supportsCache() const override { return delegate->supportsCache(); }
|
||||
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
|
||||
|
||||
StoredObjects getStorageObjects(const String & path) const override { return delegate->getStorageObjects(path); }
|
||||
DiskObjectStoragePtr getObjectStorage(const String &) override;
|
||||
DiskObjectStoragePtr createDiskObjectStorage(const String &) override;
|
||||
|
||||
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithObjectStoragePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
|
||||
|
||||
|
@ -219,7 +219,7 @@ public:
|
||||
|
||||
virtual String getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no cache path"); }
|
||||
|
||||
virtual bool isCached() const { return false; }
|
||||
virtual bool supportsCache() const { return false; }
|
||||
|
||||
/// Returns a list of storage objects (contains path, size, ...).
|
||||
/// (A list is returned because for Log family engines there might
|
||||
@ -340,11 +340,11 @@ public:
|
||||
/// Return current disk revision.
|
||||
virtual UInt64 getRevision() const { return 0; }
|
||||
|
||||
virtual DiskObjectStoragePtr getObjectStorage(const String &)
|
||||
virtual DiskObjectStoragePtr createDiskObjectStorage(const String &)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Method getObjectStorage() is not implemented for disk type: {}",
|
||||
"Method createDiskObjectStorage() is not implemented for disk type: {}",
|
||||
getType());
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
|
||||
|
||||
/// What a shame, no Exists method...
|
||||
Azure::Storage::Blobs::ListBlobsOptions options;
|
||||
options.Prefix = object.path;
|
||||
options.Prefix = object.absolute_path;
|
||||
options.PageSizeHint = 1;
|
||||
|
||||
auto blobs_list_response = client_ptr->ListBlobs(options);
|
||||
@ -51,7 +51,7 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
|
||||
|
||||
for (const auto & blob : blobs_list)
|
||||
{
|
||||
if (object.path == blob.Name)
|
||||
if (object.absolute_path == blob.Name)
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLI
|
||||
auto settings_ptr = settings.get();
|
||||
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(
|
||||
client.get(), object.path, read_settings, settings_ptr->max_single_read_retries,
|
||||
client.get(), object.absolute_path, read_settings, settings_ptr->max_single_read_retries,
|
||||
settings_ptr->max_single_download_retries, read_settings.remote_fs_buffer_size);
|
||||
}
|
||||
|
||||
@ -111,11 +111,11 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
||||
|
||||
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
|
||||
client.get(),
|
||||
object.path,
|
||||
object.absolute_path,
|
||||
settings.get()->max_single_part_upload_size,
|
||||
buf_size);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.path);
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.absolute_path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
@ -135,7 +135,7 @@ void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithS
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void AzureObjectStorage::removeObject(const StoredObject & object)
|
||||
{
|
||||
const auto & path = object.path;
|
||||
const auto & path = object.absolute_path;
|
||||
auto client_ptr = client.get();
|
||||
auto delete_info = client_ptr->DeleteBlob(path);
|
||||
if (!delete_info.Value.Deleted)
|
||||
@ -147,23 +147,23 @@ void AzureObjectStorage::removeObjects(const StoredObjects & objects)
|
||||
auto client_ptr = client.get();
|
||||
for (const auto & object : objects)
|
||||
{
|
||||
auto delete_info = client_ptr->DeleteBlob(object.path);
|
||||
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
|
||||
if (!delete_info.Value.Deleted)
|
||||
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", object.path);
|
||||
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", object.absolute_path);
|
||||
}
|
||||
}
|
||||
|
||||
void AzureObjectStorage::removeObjectIfExists(const StoredObject & object)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto delete_info = client_ptr->DeleteBlob(object.path);
|
||||
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
for (const auto & object : objects)
|
||||
auto delete_info = client_ptr->DeleteBlob(object.path);
|
||||
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
|
||||
}
|
||||
|
||||
|
||||
@ -190,8 +190,8 @@ void AzureObjectStorage::copyObject( /// NOLINT
|
||||
std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto dest_blob_client = client_ptr->GetBlobClient(object_to.path);
|
||||
auto source_blob_client = client_ptr->GetBlobClient(object_from.path);
|
||||
auto dest_blob_client = client_ptr->GetBlobClient(object_to.absolute_path);
|
||||
auto source_blob_client = client_ptr->GetBlobClient(object_from.absolute_path);
|
||||
|
||||
Azure::Storage::Blobs::CopyBlobFromUriOptions copy_options;
|
||||
if (object_to_attributes.has_value())
|
||||
|
@ -49,6 +49,8 @@ public:
|
||||
AzureClientPtr && client_,
|
||||
SettingsPtr && settings_);
|
||||
|
||||
std::string getName() const override { return "AzureObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
|
@ -104,8 +104,21 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
|
||||
|
||||
azure_blob_storage_disk->startup(context);
|
||||
|
||||
#ifdef NDEBUG
|
||||
bool use_cache = true;
|
||||
#else
|
||||
/// Current cache implementation lead to allocations in destructor of
|
||||
/// read buffer.
|
||||
bool use_cache = false;
|
||||
#endif
|
||||
if (config.getBool(config_prefix + ".cache_enabled", use_cache))
|
||||
{
|
||||
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
|
||||
azure_blob_storage_disk = wrapWithCache(azure_blob_storage_disk, "azure-blob-storage-cache", cache_path, metadata_path);
|
||||
}
|
||||
return std::make_shared<DiskRestartProxy>(azure_blob_storage_disk);
|
||||
};
|
||||
|
||||
factory.registerDiskType("azure_blob_storage", creator);
|
||||
}
|
||||
|
||||
|
@ -251,7 +251,7 @@ String DiskObjectStorage::getUniqueId(const String & path) const
|
||||
String id;
|
||||
auto blobs_paths = metadata_storage->getStorageObjects(path);
|
||||
if (!blobs_paths.empty())
|
||||
id = blobs_paths[0].path;
|
||||
id = blobs_paths[0].absolute_path;
|
||||
return id;
|
||||
}
|
||||
|
||||
@ -435,12 +435,12 @@ std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
|
||||
return {};
|
||||
}
|
||||
|
||||
bool DiskObjectStorage::isCached() const
|
||||
bool DiskObjectStorage::supportsCache() const
|
||||
{
|
||||
return object_storage->isCached();
|
||||
return object_storage->supportsCache();
|
||||
}
|
||||
|
||||
DiskObjectStoragePtr DiskObjectStorage::getObjectStorage(const String & name_)
|
||||
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage(const String & name_)
|
||||
{
|
||||
return std::make_shared<DiskObjectStorage>(
|
||||
name_,
|
||||
|
@ -164,9 +164,9 @@ public:
|
||||
|
||||
UInt64 getRevision() const override;
|
||||
|
||||
DiskObjectStoragePtr getObjectStorage(const String & name_) override;
|
||||
DiskObjectStoragePtr createDiskObjectStorage(const String & name_) override;
|
||||
|
||||
bool isCached() const override;
|
||||
bool supportsCache() const override;
|
||||
|
||||
private:
|
||||
|
||||
|
@ -98,7 +98,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
|
||||
ObjectAttributes metadata {
|
||||
{"path", path}
|
||||
};
|
||||
updateObjectMetadata(object.path, metadata);
|
||||
updateObjectMetadata(object.absolute_path, metadata);
|
||||
}
|
||||
}
|
||||
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
|
||||
|
@ -133,7 +133,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
if (remove_from_cache)
|
||||
{
|
||||
for (const auto & object : objects_to_remove)
|
||||
object_storage.removeCacheIfExists(object.getCacheHint());
|
||||
object_storage.removeCacheIfExists(object.getPathKeyForCache());
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -227,7 +227,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
||||
}
|
||||
|
||||
for (const auto & object : objects_to_remove_from_cache)
|
||||
object_storage.removeCacheIfExists(object.getCacheHint());
|
||||
object_storage.removeCacheIfExists(object.getPathKeyForCache());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -40,7 +40,7 @@ std::string HDFSObjectStorage::generateBlobNameForPath(const std::string & /* pa
|
||||
|
||||
bool HDFSObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
const auto & path = object.path;
|
||||
const auto & path = object.absolute_path;
|
||||
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||
const String remote_fs_object_path = path.substr(begin_of_path);
|
||||
return (0 == hdfsExists(hdfs_fs.get(), remote_fs_object_path.c_str()));
|
||||
@ -52,7 +52,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromHDFS>(object.path, object.path, config, read_settings);
|
||||
return std::make_unique<ReadBufferFromHDFS>(object.absolute_path, object.absolute_path, config, read_settings);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
|
||||
@ -81,10 +81,10 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
||||
|
||||
/// Single O_WRONLY in libhdfs adds O_TRUNC
|
||||
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
|
||||
object.path, config, settings->replication, buf_size,
|
||||
object.absolute_path, config, settings->replication, buf_size,
|
||||
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), object.path);
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), object.absolute_path);
|
||||
}
|
||||
|
||||
|
||||
@ -103,7 +103,7 @@ void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSi
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void HDFSObjectStorage::removeObject(const StoredObject & object)
|
||||
{
|
||||
const auto & path = object.path;
|
||||
const auto & path = object.absolute_path;
|
||||
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||
|
||||
/// Add path from root to file name
|
||||
|
@ -50,6 +50,8 @@ public:
|
||||
, settings(std::move(settings_))
|
||||
{}
|
||||
|
||||
std::string getName() const override { return "HDFSObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
|
@ -38,7 +38,7 @@ void registerDiskHDFS(DiskFactory & factory)
|
||||
/// FIXME Cache currently unsupported :(
|
||||
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
|
||||
|
||||
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context_);
|
||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context_);
|
||||
|
||||
auto metadata_storage = std::make_shared<MetadataStorageFromRemoteDisk>(metadata_disk, uri);
|
||||
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
|
||||
@ -53,6 +53,20 @@ void registerDiskHDFS(DiskFactory & factory)
|
||||
/* send_metadata = */ false,
|
||||
copy_thread_pool_size);
|
||||
|
||||
#ifdef NDEBUG
|
||||
bool use_cache = true;
|
||||
#else
|
||||
/// Current S3 cache implementation lead to allocations in destructor of
|
||||
/// read buffer.
|
||||
bool use_cache = false;
|
||||
#endif
|
||||
|
||||
if (config.getBool(config_prefix + ".cache_enabled", use_cache))
|
||||
{
|
||||
String cache_path = config.getString(config_prefix + ".cache_path", context_->getPath() + "disks/" + name + "/cache/");
|
||||
disk_result = wrapWithCache(disk_result, "hdfs-cache", cache_path, metadata_path);
|
||||
}
|
||||
|
||||
return std::make_shared<DiskRestartProxy>(disk_result);
|
||||
};
|
||||
|
||||
|
@ -128,9 +128,7 @@ public:
|
||||
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
|
||||
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
|
||||
|
||||
virtual std::string getObjectStoragePath() const = 0;
|
||||
|
||||
/// Return [(object_storage_path, size_in_bytes), ...] for metadata path.
|
||||
/// Return object information (absolute_path, bytes_size, ...) for metadata path.
|
||||
/// object_storage_path is absolute.
|
||||
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
|
||||
|
||||
|
@ -44,22 +44,22 @@ void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
|
||||
std::string IObjectStorage::getCacheBasePath() const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheBasePath() is not implemented for object storage");
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheBasePath() is not implemented for {}", getName());
|
||||
}
|
||||
|
||||
StoredObject::StoredObject(
|
||||
const std::string & path_,
|
||||
const std::string & absolute_path_,
|
||||
uint64_t bytes_size_,
|
||||
std::function<String(const String &)> && cache_hint_creator_)
|
||||
: path(path_)
|
||||
PathKeyForCacheCreator && path_key_for_cache_creator_)
|
||||
: absolute_path(absolute_path_)
|
||||
, bytes_size(bytes_size_)
|
||||
, cache_hint_creator(std::move(cache_hint_creator_))
|
||||
, path_key_for_cache_creator(std::move(path_key_for_cache_creator_))
|
||||
{}
|
||||
|
||||
std::string StoredObject::getCacheHint() const
|
||||
std::string StoredObject::getPathKeyForCache() const
|
||||
{
|
||||
if (cache_hint_creator)
|
||||
return cache_hint_creator(path);
|
||||
if (path_key_for_cache_creator)
|
||||
return path_key_for_cache_creator(absolute_path);
|
||||
return "";
|
||||
}
|
||||
|
||||
|
@ -37,23 +37,23 @@ struct RelativePathWithSize
|
||||
using RelativePathsWithSize = std::vector<RelativePathWithSize>;
|
||||
|
||||
|
||||
/// Object metadata: path, size. cache_hint.
|
||||
/// Object metadata: path, size, path_key_for_cache.
|
||||
struct StoredObject
|
||||
{
|
||||
std::string path; /// absolute
|
||||
std::string absolute_path;
|
||||
uint64_t bytes_size;
|
||||
|
||||
/// Optional cache hint for cache. Use delayed initialization
|
||||
/// because somecache hint implementation requires it.
|
||||
using CacheHintCreator = std::function<std::string(const std::string &)>;
|
||||
CacheHintCreator cache_hint_creator;
|
||||
using PathKeyForCacheCreator = std::function<std::string(const std::string &)>;
|
||||
PathKeyForCacheCreator path_key_for_cache_creator;
|
||||
|
||||
StoredObject() = default;
|
||||
|
||||
explicit StoredObject(
|
||||
const std::string & path_, uint64_t bytes_size_ = 0, CacheHintCreator && cache_hint_creator_ = {});
|
||||
const std::string & absolute_path_, uint64_t bytes_size_ = 0, PathKeyForCacheCreator && path_key_for_cache_creator_ = {});
|
||||
|
||||
std::string getCacheHint() const;
|
||||
std::string getPathKeyForCache() const;
|
||||
};
|
||||
|
||||
using StoredObjects = std::vector<StoredObject>;
|
||||
@ -75,6 +75,8 @@ class IObjectStorage
|
||||
public:
|
||||
IObjectStorage() = default;
|
||||
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
/// Object exists or not
|
||||
virtual bool exists(const StoredObject & object) const = 0;
|
||||
|
||||
@ -168,15 +170,17 @@ public:
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix, ContextPtr context) = 0;
|
||||
|
||||
/// Generate object storage path.
|
||||
/// Generate blob name for passed absolute local path.
|
||||
/// Path can be generated either independently or based on `path`.
|
||||
virtual std::string generateBlobNameForPath(const std::string & path) = 0;
|
||||
|
||||
virtual bool supportsAppend() const { return false; }
|
||||
|
||||
/// Remove filesystem cache. `path` is a result of object.getPathKeyForCache() method,
|
||||
/// which is used to define a cache key for the source object path.
|
||||
virtual void removeCacheIfExists(const std::string & /* path */) {}
|
||||
|
||||
virtual bool isCached() const { return false; }
|
||||
virtual bool supportsCache() const { return false; }
|
||||
};
|
||||
|
||||
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
|
||||
|
@ -32,7 +32,7 @@ LocalObjectStorage::LocalObjectStorage()
|
||||
|
||||
bool LocalObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
return fs::exists(object.path);
|
||||
return fs::exists(object.absolute_path);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
|
||||
@ -53,13 +53,27 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObject( /// NOLI
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const
|
||||
{
|
||||
const auto & path = object.path;
|
||||
const auto & path = object.absolute_path;
|
||||
|
||||
if (!file_size.has_value())
|
||||
file_size = getFileSizeIfPossible(path);
|
||||
if (!file_size)
|
||||
file_size = tryGetSizeFromFilePath(path);
|
||||
|
||||
/// For now we cannot allow asynchrnous reader from local filesystem when CachedObjectStorage is used.
|
||||
ReadSettings modified_settings{read_settings};
|
||||
modified_settings.local_fs_method = LocalFSReadMethod::pread;
|
||||
switch (modified_settings.local_fs_method)
|
||||
{
|
||||
case LocalFSReadMethod::pread_threadpool:
|
||||
case LocalFSReadMethod::pread_fake_async:
|
||||
{
|
||||
modified_settings.local_fs_method = LocalFSReadMethod::pread;
|
||||
LOG_INFO(log, "Changing local filesystem read method to `pread`");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Read object: {}", path);
|
||||
return createReadBufferFromFileBase(path, modified_settings, read_hint, file_size);
|
||||
@ -73,7 +87,7 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
|
||||
size_t buf_size,
|
||||
const WriteSettings & /* write_settings */)
|
||||
{
|
||||
const auto & path = object.path;
|
||||
const auto & path = object.absolute_path;
|
||||
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
||||
LOG_TEST(log, "Write object: {}", path);
|
||||
return std::make_unique<WriteBufferFromFile>(path, buf_size, flags);
|
||||
@ -92,8 +106,8 @@ void LocalObjectStorage::removeObject(const StoredObject & object)
|
||||
if (!exists(object))
|
||||
return;
|
||||
|
||||
if (0 != unlink(object.path.data()))
|
||||
throwFromErrnoWithPath("Cannot unlink file " + object.path, object.path, ErrorCodes::CANNOT_UNLINK);
|
||||
if (0 != unlink(object.absolute_path.data()))
|
||||
throwFromErrnoWithPath("Cannot unlink file " + object.absolute_path, object.absolute_path, ErrorCodes::CANNOT_UNLINK);
|
||||
}
|
||||
|
||||
void LocalObjectStorage::removeObjects(const StoredObjects & objects)
|
||||
@ -122,11 +136,11 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path
|
||||
void LocalObjectStorage::copyObject( // NOLINT
|
||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> /* object_to_attributes */)
|
||||
{
|
||||
fs::path to = object_to.path;
|
||||
fs::path from = object_from.path;
|
||||
fs::path to = object_to.absolute_path;
|
||||
fs::path from = object_from.absolute_path;
|
||||
|
||||
/// Same logic as in DiskLocal.
|
||||
if (object_from.path.ends_with('/'))
|
||||
if (object_from.absolute_path.ends_with('/'))
|
||||
from = from.parent_path();
|
||||
if (fs::is_directory(from))
|
||||
to /= from.filename();
|
||||
|
@ -17,6 +17,8 @@ class LocalObjectStorage : public IObjectStorage
|
||||
public:
|
||||
LocalObjectStorage();
|
||||
|
||||
std::string getName() const override { return "LocalObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
|
@ -100,38 +100,33 @@ StoredObjects MetadataStorageFromLocalDisk::getStorageObjects(const std::string
|
||||
StoredObject MetadataStorageFromLocalDisk::createStorageObject(const std::string & blob_name) const
|
||||
{
|
||||
auto blob_path = fs::path(object_storage_root_path) / blob_name;
|
||||
StoredObject::CacheHintCreator cache_hint_creator;
|
||||
size_t object_size = 0;
|
||||
|
||||
StoredObject::PathKeyForCacheCreator path_key_for_cache_creator = [](const String & blob_path_) -> String
|
||||
{
|
||||
try
|
||||
{
|
||||
return toString(getINodeNumberFromPath(blob_path_));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("MetadataStorageFromLocalDisk"),
|
||||
"Object does not exist while getting cache path hint (object path: {})",
|
||||
blob_path_);
|
||||
|
||||
return "";
|
||||
}
|
||||
};
|
||||
|
||||
if (exists(blob_path))
|
||||
{
|
||||
object_size = getFileSize(blob_path);
|
||||
cache_hint_creator = [cache_hint = toString(getINodeNumberFromPath(blob_path))](const String &)
|
||||
{
|
||||
return cache_hint;
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
cache_hint_creator = [](const String & blob_path_) -> String
|
||||
{
|
||||
try
|
||||
{
|
||||
return toString(getINodeNumberFromPath(blob_path_));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("MetadataStorageFromLocalDisk"),
|
||||
"Object does not exist while getting cache path hint (object path: {})",
|
||||
blob_path_);
|
||||
|
||||
return "";
|
||||
}
|
||||
};
|
||||
path_key_for_cache_creator =
|
||||
[path_key = path_key_for_cache_creator(blob_path)](const String &) { return path_key; };
|
||||
}
|
||||
|
||||
return StoredObject{blob_path, object_size, std::move(cache_hint_creator)};
|
||||
return StoredObject{blob_path, object_size, std::move(path_key_for_cache_creator)};
|
||||
}
|
||||
|
||||
uint32_t MetadataStorageFromLocalDisk::getHardlinkCount(const std::string & path) const
|
||||
@ -140,7 +135,7 @@ uint32_t MetadataStorageFromLocalDisk::getHardlinkCount(const std::string & path
|
||||
return disk->getRefCount(path);
|
||||
}
|
||||
|
||||
void MetadataStorageFromLocalDiskTransaction::writeStringToFile(const std::string & path, const std::string & data) /// NOLINT
|
||||
void MetadataStorageFromLocalDiskTransaction::writeStringToFile(const std::string & path, const std::string & data)
|
||||
{
|
||||
auto wb = disk->writeFile(path);
|
||||
wb->write(data.data(), data.size());
|
||||
|
@ -8,11 +8,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MetadataStorageFromLocalDisk : public IMetadataStorage
|
||||
class MetadataStorageFromLocalDisk final : public IMetadataStorage
|
||||
{
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromLocalDisk(
|
||||
MetadataStorageFromLocalDisk(
|
||||
DiskPtr disk_,
|
||||
ObjectStoragePtr object_storage_,
|
||||
const std::string & object_storage_root_path_);
|
||||
@ -49,8 +49,6 @@ public:
|
||||
|
||||
StoredObject createStorageObject(const std::string & blob_name) const override;
|
||||
|
||||
std::string getObjectStoragePath() const override { return object_storage_root_path; }
|
||||
|
||||
private:
|
||||
DiskPtr disk;
|
||||
ObjectStoragePtr object_storage;
|
||||
@ -63,7 +61,7 @@ private:
|
||||
DiskPtr disk;
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromLocalDiskTransaction(const MetadataStorageFromLocalDisk & metadata_storage_, DiskPtr disk_)
|
||||
MetadataStorageFromLocalDiskTransaction(const MetadataStorageFromLocalDisk & metadata_storage_, DiskPtr disk_)
|
||||
: MetadataStorageFromDiskTransaction(metadata_storage_)
|
||||
, disk(disk_)
|
||||
{}
|
||||
|
@ -148,7 +148,7 @@ uint32_t MetadataStorageFromRemoteDisk::getHardlinkCount(const std::string & pat
|
||||
return metadata->getRefCount();
|
||||
}
|
||||
|
||||
void MetadataStorageFromRemoteDiskTransaction::writeStringToFile( /// NOLINT
|
||||
void MetadataStorageFromRemoteDiskTransaction::writeStringToFile(
|
||||
const std::string & path,
|
||||
const std::string & data)
|
||||
{
|
||||
@ -213,7 +213,7 @@ void MetadataStorageFromRemoteDiskTransaction::setReadOnly(const std::string & p
|
||||
void MetadataStorageFromRemoteDiskTransaction::createEmptyMetadataFile(const std::string & path)
|
||||
{
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(
|
||||
metadata_storage_for_remote.getDisk()->getPath(), metadata_storage_for_remote.getObjectStoragePath(), path);
|
||||
metadata_storage_for_remote.getDisk()->getPath(), metadata_storage_for_remote.getObjectStorageRootPath(), path);
|
||||
|
||||
auto data = metadata->serializeToString();
|
||||
if (!data.empty())
|
||||
@ -223,7 +223,7 @@ void MetadataStorageFromRemoteDiskTransaction::createEmptyMetadataFile(const std
|
||||
void MetadataStorageFromRemoteDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
||||
{
|
||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(
|
||||
metadata_storage_for_remote.getDisk()->getPath(), metadata_storage_for_remote.getObjectStoragePath(), path);
|
||||
metadata_storage_for_remote.getDisk()->getPath(), metadata_storage_for_remote.getObjectStorageRootPath(), path);
|
||||
|
||||
metadata->addObject(blob_name, size_in_bytes);
|
||||
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
std::string getObjectStoragePath() const override { return object_storage_root_path; }
|
||||
std::string getObjectStorageRootPath() const { return object_storage_root_path; }
|
||||
|
||||
DiskPtr getDisk() const override { return disk; }
|
||||
|
||||
|
@ -103,7 +103,7 @@ Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const s
|
||||
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
auto object_head = requestObjectHeadData(bucket, object.path);
|
||||
auto object_head = requestObjectHeadData(bucket, object.absolute_path);
|
||||
if (!object_head.IsSuccess())
|
||||
{
|
||||
if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||
@ -122,12 +122,12 @@ String S3ObjectStorage::getCacheBasePath() const
|
||||
return cache->getBasePath();
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeCacheIfExists(const std::string & path)
|
||||
void S3ObjectStorage::removeCacheIfExists(const std::string & path_key)
|
||||
{
|
||||
if (!cache)
|
||||
if (!cache || path_key.empty())
|
||||
return;
|
||||
|
||||
IFileCache::Key key = cache->hash(path);
|
||||
IFileCache::Key key = cache->hash(path_key);
|
||||
cache->removeIfExists(key);
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client.get(),
|
||||
bucket,
|
||||
object.path,
|
||||
object.absolute_path,
|
||||
version_id,
|
||||
settings_ptr->s3_settings.max_single_read_retries,
|
||||
read_settings);
|
||||
@ -204,7 +204,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
|
||||
client.get(),
|
||||
bucket,
|
||||
object.path,
|
||||
object.absolute_path,
|
||||
settings_ptr->s3_settings,
|
||||
attributes,
|
||||
buf_size,
|
||||
@ -213,7 +213,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(
|
||||
std::move(s3_buffer), std::move(finalize_callback), object.path);
|
||||
std::move(s3_buffer), std::move(finalize_callback), object.absolute_path);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
@ -255,7 +255,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
|
||||
{
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(object.path);
|
||||
request.SetKey(object.absolute_path);
|
||||
auto outcome = client_ptr->DeleteObject(request);
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
@ -265,7 +265,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
|
||||
/// TODO: For AWS we prefer to use multiobject operation even for single object
|
||||
/// maybe we shouldn't?
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(object.path);
|
||||
obj.SetKey(object.absolute_path);
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects({obj});
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
@ -302,12 +302,12 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
for (; current_position < objects.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
||||
{
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(objects[current_position].path);
|
||||
obj.SetKey(objects[current_position].absolute_path);
|
||||
current_chunk.push_back(obj);
|
||||
|
||||
if (!keys.empty())
|
||||
keys += ", ";
|
||||
keys += objects[current_position].path;
|
||||
keys += objects[current_position].absolute_path;
|
||||
}
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
@ -364,10 +364,9 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
/// Shortcut for S3
|
||||
auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to);
|
||||
if (dest_s3)
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
copyObjectImpl(bucket, object_from.path, dest_s3->bucket, object_to.path, {}, object_to_attributes);
|
||||
copyObjectImpl(bucket, object_from.absolute_path, dest_s3->bucket, object_to.absolute_path, {}, object_to_attributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -489,18 +488,18 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
void S3ObjectStorage::copyObject( // NOLINT
|
||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto head = requestObjectHeadData(bucket, object_from.path).GetResult();
|
||||
auto head = requestObjectHeadData(bucket, object_from.absolute_path).GetResult();
|
||||
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
|
||||
|
||||
if (head.GetContentLength() >= multipart_upload_threashold)
|
||||
{
|
||||
copyObjectMultipartImpl(
|
||||
bucket, object_from.path, bucket, object_to.path, head, object_to_attributes);
|
||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
copyObjectImpl(
|
||||
bucket, object_from.path, bucket, object_to.path, head, object_to_attributes);
|
||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,6 +58,8 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
std::string getName() const override { return "S3ObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
@ -126,9 +128,9 @@ public:
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
bool isCached() const override { return true; }
|
||||
bool supportsCache() const override { return true; }
|
||||
|
||||
void removeCacheIfExists(const std::string & path) override;
|
||||
void removeCacheIfExists(const std::string & path_key) override;
|
||||
|
||||
String getCacheBasePath() const override;
|
||||
|
||||
|
@ -57,7 +57,7 @@ Pipe StorageSystemDisks::read(
|
||||
col_type->insert(toString(disk_ptr->getType()));
|
||||
|
||||
String cache_path;
|
||||
if (disk_ptr->isCached())
|
||||
if (disk_ptr->supportsCache())
|
||||
cache_path = disk_ptr->getCacheBasePath();
|
||||
|
||||
col_cache_path->insert(cache_path);
|
||||
|
@ -68,11 +68,11 @@ Pipe StorageSystemRemoteDataPaths::read(
|
||||
col_base_path->insert(disk->getPath());
|
||||
col_cache_base_path->insert(cache_base_path);
|
||||
col_local_path->insert(local_path);
|
||||
col_remote_path->insert(object.path);
|
||||
col_remote_path->insert(object.absolute_path);
|
||||
|
||||
if (cache)
|
||||
{
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->hash(object.getCacheHint()));
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->hash(object.getPathKeyForCache()));
|
||||
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
||||
}
|
||||
else
|
||||
|
Loading…
Reference in New Issue
Block a user