mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Digging
This commit is contained in:
parent
ea389eeaca
commit
d8580c8cb8
@ -569,16 +569,6 @@ void DiskObjectStorage::startup()
|
|||||||
LOG_INFO(log, "Starting up disk {}", name);
|
LOG_INFO(log, "Starting up disk {}", name);
|
||||||
object_storage->startup();
|
object_storage->startup();
|
||||||
|
|
||||||
if (send_metadata)
|
|
||||||
{
|
|
||||||
metadata_helper->restore();
|
|
||||||
|
|
||||||
if (metadata_helper->readSchemaVersion(remote_fs_root_path) < DiskObjectStorageMetadataHelper::RESTORABLE_SCHEMA_VERSION)
|
|
||||||
metadata_helper->migrateToRestorableSchema();
|
|
||||||
|
|
||||||
metadata_helper->findLastRevision();
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO(log, "Disk {} started up", name);
|
LOG_INFO(log, "Disk {} started up", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -674,6 +664,26 @@ void DiskObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration
|
|||||||
object_storage->applyNewSettings(config, "storage_configuration.disks." + name, context_);
|
object_storage->applyNewSettings(config, "storage_configuration.disks." + name, context_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DiskObjectStorage::restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||||
|
{
|
||||||
|
if (send_metadata)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "START RESTORING METADATA");
|
||||||
|
metadata_helper->restore(config, config_prefix, context);
|
||||||
|
|
||||||
|
if (metadata_helper->readSchemaVersion(object_storage.get(), remote_fs_root_path) < DiskObjectStorageMetadataHelper::RESTORABLE_SCHEMA_VERSION)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "DONE READING");
|
||||||
|
metadata_helper->migrateToRestorableSchema();
|
||||||
|
LOG_DEBUG(log, "MIGRATION FINISHED");
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_DEBUG(log, "SEARCHING LAST REVISION");
|
||||||
|
metadata_helper->findLastRevision();
|
||||||
|
LOG_DEBUG(log, "DONE RESTORING METADATA");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const
|
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const
|
||||||
{
|
{
|
||||||
if (i != 0)
|
if (i != 0)
|
||||||
@ -750,14 +760,14 @@ void DiskObjectStorageMetadataHelper::findLastRevision()
|
|||||||
LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name);
|
LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
int DiskObjectStorageMetadataHelper::readSchemaVersion(const String & source_path) const
|
int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const
|
||||||
{
|
{
|
||||||
const std::string path = source_path + SCHEMA_VERSION_OBJECT;
|
const std::string path = source_path + SCHEMA_VERSION_OBJECT;
|
||||||
int version = 0;
|
int version = 0;
|
||||||
if (!disk->object_storage->exists(path))
|
if (!object_storage->exists(path))
|
||||||
return version;
|
return version;
|
||||||
|
|
||||||
auto buf = disk->object_storage->readObject(path);
|
auto buf = object_storage->readObject(path);
|
||||||
readIntText(version, *buf);
|
readIntText(version, *buf);
|
||||||
|
|
||||||
return version;
|
return version;
|
||||||
@ -800,20 +810,22 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const S
|
|||||||
|
|
||||||
bool dir_contains_only_files = true;
|
bool dir_contains_only_files = true;
|
||||||
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
||||||
|
{
|
||||||
if (disk->isDirectory(it->path()))
|
if (disk->isDirectory(it->path()))
|
||||||
{
|
{
|
||||||
dir_contains_only_files = false;
|
dir_contains_only_files = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The whole directory can be migrated asynchronously.
|
/// The whole directory can be migrated asynchronously.
|
||||||
if (dir_contains_only_files)
|
if (dir_contains_only_files)
|
||||||
{
|
{
|
||||||
auto result = disk->getExecutor().execute([this, path]
|
auto result = disk->getExecutor().execute([this, path]
|
||||||
{
|
{
|
||||||
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
||||||
migrateFileToRestorableSchema(it->path());
|
migrateFileToRestorableSchema(it->path());
|
||||||
});
|
});
|
||||||
|
|
||||||
results.push_back(std::move(result));
|
results.push_back(std::move(result));
|
||||||
}
|
}
|
||||||
@ -863,15 +875,18 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchema()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadataHelper::restore()
|
void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||||
{
|
{
|
||||||
if (!disk->exists(RESTORE_FILE_NAME))
|
if (!disk->exists(RESTORE_FILE_NAME))
|
||||||
|
{
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
RestoreInformation information;
|
RestoreInformation information;
|
||||||
information.source_path = disk->remote_fs_root_path;
|
information.source_path = disk->remote_fs_root_path;
|
||||||
|
information.source_namespace = disk->object_storage->getObjectsNamespace();
|
||||||
|
|
||||||
readRestoreInformation(information);
|
readRestoreInformation(information);
|
||||||
if (information.revision == 0)
|
if (information.revision == 0)
|
||||||
@ -879,19 +894,28 @@ void DiskObjectStorageMetadataHelper::restore()
|
|||||||
if (!information.source_path.ends_with('/'))
|
if (!information.source_path.ends_with('/'))
|
||||||
information.source_path += '/';
|
information.source_path += '/';
|
||||||
|
|
||||||
/// In this case we need to additionally cleanup S3 from objects with later revision.
|
IObjectStorage * source_object_storage = disk->object_storage.get();
|
||||||
/// Will be simply just restore to different path.
|
if (information.source_namespace == disk->object_storage->getObjectsNamespace())
|
||||||
if (information.source_path == disk->remote_fs_root_path && information.revision != LATEST_REVISION)
|
{
|
||||||
throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS);
|
/// In this case we need to additionally cleanup S3 from objects with later revision.
|
||||||
|
/// Will be simply just restore to different path.
|
||||||
|
if (information.source_path == disk->remote_fs_root_path && information.revision != LATEST_REVISION)
|
||||||
|
throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
|
||||||
/// This case complicates S3 cleanup in case of unsuccessful restore.
|
/// This case complicates S3 cleanup in case of unsuccessful restore.
|
||||||
if (information.source_path != disk->remote_fs_root_path && disk->remote_fs_root_path.starts_with(information.source_path))
|
if (information.source_path != disk->remote_fs_root_path && disk->remote_fs_root_path.starts_with(information.source_path))
|
||||||
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
object_storage_from_another_namespace = disk->object_storage->cloneObjectStorage(information.source_namespace, config, config_prefix, context);
|
||||||
|
source_object_storage = object_storage_from_another_namespace.get();
|
||||||
|
}
|
||||||
|
|
||||||
LOG_INFO(disk->log, "Starting to restore disk {}. Revision: {}, Source path: {}",
|
LOG_INFO(disk->log, "Starting to restore disk {}. Revision: {}, Source path: {}",
|
||||||
disk->name, information.revision, information.source_path);
|
disk->name, information.revision, information.source_path);
|
||||||
|
|
||||||
if (readSchemaVersion(information.source_path) < RESTORABLE_SCHEMA_VERSION)
|
if (readSchemaVersion(source_object_storage, information.source_path) < RESTORABLE_SCHEMA_VERSION)
|
||||||
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
|
||||||
LOG_INFO(disk->log, "Removing old metadata...");
|
LOG_INFO(disk->log, "Removing old metadata...");
|
||||||
@ -901,8 +925,8 @@ void DiskObjectStorageMetadataHelper::restore()
|
|||||||
if (disk->exists(root))
|
if (disk->exists(root))
|
||||||
disk->removeSharedRecursive(root + '/', !cleanup_s3, {});
|
disk->removeSharedRecursive(root + '/', !cleanup_s3, {});
|
||||||
|
|
||||||
restoreFiles(information);
|
restoreFiles(source_object_storage, information);
|
||||||
restoreFileOperations(information);
|
restoreFileOperations(source_object_storage, information);
|
||||||
|
|
||||||
disk->metadata_disk->removeFile(RESTORE_FILE_NAME);
|
disk->metadata_disk->removeFile(RESTORE_FILE_NAME);
|
||||||
|
|
||||||
@ -949,10 +973,12 @@ void DiskObjectStorageMetadataHelper::readRestoreInformation(RestoreInformation
|
|||||||
|
|
||||||
for (const auto & [key, value] : properties)
|
for (const auto & [key, value] : properties)
|
||||||
{
|
{
|
||||||
ReadBufferFromString value_buffer (value);
|
ReadBufferFromString value_buffer(value);
|
||||||
|
|
||||||
if (key == "revision")
|
if (key == "revision")
|
||||||
readIntText(restore_information.revision, value_buffer);
|
readIntText(restore_information.revision, value_buffer);
|
||||||
|
else if (key == "source_bucket" || key == "source_namespace")
|
||||||
|
readText(restore_information.source_namespace, value_buffer);
|
||||||
else if (key == "source_path")
|
else if (key == "source_path")
|
||||||
readText(restore_information.source_path, value_buffer);
|
readText(restore_information.source_path, value_buffer);
|
||||||
else if (key == "detached")
|
else if (key == "detached")
|
||||||
@ -988,12 +1014,12 @@ static std::tuple<UInt64, String> extractRevisionAndOperationFromKey(const Strin
|
|||||||
return {(revision_str.empty() ? 0 : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong())), operation};
|
return {(revision_str.empty() ? 0 : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong())), operation};
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & restore_information)
|
void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
|
||||||
{
|
{
|
||||||
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
|
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
|
||||||
|
|
||||||
std::vector<std::future<void>> results;
|
std::vector<std::future<void>> results;
|
||||||
auto restore_files = [this, &restore_information, &results](const BlobsPathToSize & keys)
|
auto restore_files = [this, &source_object_storage, &restore_information, &results](const BlobsPathToSize & keys)
|
||||||
{
|
{
|
||||||
std::vector<String> keys_names;
|
std::vector<String> keys_names;
|
||||||
for (const auto & [key, size] : keys)
|
for (const auto & [key, size] : keys)
|
||||||
@ -1012,9 +1038,9 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re
|
|||||||
|
|
||||||
if (!keys_names.empty())
|
if (!keys_names.empty())
|
||||||
{
|
{
|
||||||
auto result = disk->getExecutor().execute([this, &restore_information, keys_names]()
|
auto result = disk->getExecutor().execute([this, &source_object_storage, &restore_information, keys_names]()
|
||||||
{
|
{
|
||||||
processRestoreFiles(restore_information.source_path, keys_names);
|
processRestoreFiles(source_object_storage, restore_information.source_path, keys_names);
|
||||||
});
|
});
|
||||||
|
|
||||||
results.push_back(std::move(result));
|
results.push_back(std::move(result));
|
||||||
@ -1024,7 +1050,7 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re
|
|||||||
};
|
};
|
||||||
|
|
||||||
BlobsPathToSize children;
|
BlobsPathToSize children;
|
||||||
disk->object_storage->listPrefix(restore_information.source_path, children);
|
source_object_storage->listPrefix(restore_information.source_path, children);
|
||||||
restore_files(children);
|
restore_files(children);
|
||||||
|
|
||||||
for (auto & result : results)
|
for (auto & result : results)
|
||||||
@ -1036,11 +1062,11 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadataHelper::processRestoreFiles(const String & source_path, std::vector<String> keys)
|
void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector<String> & keys)
|
||||||
{
|
{
|
||||||
for (const auto & key : keys)
|
for (const auto & key : keys)
|
||||||
{
|
{
|
||||||
auto meta = disk->object_storage->getObjectMetadata(key);
|
auto meta = source_object_storage->getObjectMetadata(key);
|
||||||
auto object_attributes = meta.attributes;
|
auto object_attributes = meta.attributes;
|
||||||
|
|
||||||
String path;
|
String path;
|
||||||
@ -1066,7 +1092,7 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(const String & source_
|
|||||||
|
|
||||||
/// Copy object if we restore to different bucket / path.
|
/// Copy object if we restore to different bucket / path.
|
||||||
if (disk->remote_fs_root_path != source_path)
|
if (disk->remote_fs_root_path != source_path)
|
||||||
disk->object_storage->copyObject(key, disk->remote_fs_root_path + relative_key);
|
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage);
|
||||||
|
|
||||||
auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata)
|
auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata)
|
||||||
{
|
{
|
||||||
@ -1088,13 +1114,13 @@ static String pathToDetached(const String & source_path)
|
|||||||
return fs::path(source_path).parent_path() / "detached/";
|
return fs::path(source_path).parent_path() / "detached/";
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInformation & restore_information)
|
void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
|
||||||
{
|
{
|
||||||
/// Enable recording file operations if we restore to different bucket / path.
|
/// Enable recording file operations if we restore to different bucket / path.
|
||||||
bool send_metadata = disk->remote_fs_root_path != restore_information.source_path;
|
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path;
|
||||||
|
|
||||||
std::set<String> renames;
|
std::set<String> renames;
|
||||||
auto restore_file_operations = [this, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys)
|
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys)
|
||||||
{
|
{
|
||||||
const String rename = "rename";
|
const String rename = "rename";
|
||||||
const String hardlink = "hardlink";
|
const String hardlink = "hardlink";
|
||||||
@ -1117,7 +1143,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma
|
|||||||
if (send_metadata)
|
if (send_metadata)
|
||||||
revision_counter = revision - 1;
|
revision_counter = revision - 1;
|
||||||
|
|
||||||
auto object_attributes = *(disk->object_storage->getObjectMetadata(key).attributes);
|
auto object_attributes = *(source_object_storage->getObjectMetadata(key).attributes);
|
||||||
if (operation == rename)
|
if (operation == rename)
|
||||||
{
|
{
|
||||||
auto from_path = object_attributes["from_path"];
|
auto from_path = object_attributes["from_path"];
|
||||||
@ -1180,7 +1206,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma
|
|||||||
};
|
};
|
||||||
|
|
||||||
BlobsPathToSize children;
|
BlobsPathToSize children;
|
||||||
disk->object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
||||||
restore_file_operations(children);
|
restore_file_operations(children);
|
||||||
|
|
||||||
if (restore_information.detached)
|
if (restore_information.detached)
|
||||||
@ -1224,5 +1250,4 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma
|
|||||||
LOG_INFO(disk->log, "File operations restored for disk {}", disk->name);
|
LOG_INFO(disk->log, "File operations restored for disk {}", disk->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -164,6 +164,7 @@ public:
|
|||||||
|
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
|
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
|
||||||
|
|
||||||
|
void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
|
||||||
private:
|
private:
|
||||||
const String name;
|
const String name;
|
||||||
const String remote_fs_root_path;
|
const String remote_fs_root_path;
|
||||||
@ -284,6 +285,7 @@ public:
|
|||||||
struct RestoreInformation
|
struct RestoreInformation
|
||||||
{
|
{
|
||||||
UInt64 revision = LATEST_REVISION;
|
UInt64 revision = LATEST_REVISION;
|
||||||
|
String source_namespace;
|
||||||
String source_path;
|
String source_path;
|
||||||
bool detached = false;
|
bool detached = false;
|
||||||
};
|
};
|
||||||
@ -293,18 +295,18 @@ public:
|
|||||||
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const;
|
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const;
|
||||||
void findLastRevision();
|
void findLastRevision();
|
||||||
|
|
||||||
int readSchemaVersion(const String & source_path) const;
|
int readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const;
|
||||||
void saveSchemaVersion(const int & version) const;
|
void saveSchemaVersion(const int & version) const;
|
||||||
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
|
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
|
||||||
void migrateFileToRestorableSchema(const String & path) const;
|
void migrateFileToRestorableSchema(const String & path) const;
|
||||||
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
|
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
|
||||||
void migrateToRestorableSchema();
|
void migrateToRestorableSchema();
|
||||||
|
|
||||||
void restore();
|
void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
|
||||||
void readRestoreInformation(RestoreInformation & restore_information);
|
void readRestoreInformation(RestoreInformation & restore_information);
|
||||||
void restoreFiles(const RestoreInformation & restore_information);
|
void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);
|
||||||
void processRestoreFiles(const String & source_path, std::vector<String> keys);
|
void processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector<String> & keys);
|
||||||
void restoreFileOperations(const RestoreInformation & restore_information);
|
void restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);
|
||||||
|
|
||||||
std::atomic<UInt64> revision_counter = 0;
|
std::atomic<UInt64> revision_counter = 0;
|
||||||
inline static const String RESTORE_FILE_NAME = "restore";
|
inline static const String RESTORE_FILE_NAME = "restore";
|
||||||
@ -318,6 +320,8 @@ public:
|
|||||||
|
|
||||||
DiskObjectStorage * disk;
|
DiskObjectStorage * disk;
|
||||||
|
|
||||||
|
ObjectStoragePtr object_storage_from_another_namespace;
|
||||||
|
|
||||||
ReadSettings read_settings;
|
ReadSettings read_settings;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include <Disks/IObjectStorage.h>
|
#include <Disks/IObjectStorage.h>
|
||||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||||
|
#include <IO/copyData.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -34,4 +35,15 @@ void IObjectStorage::removeFromCache(const std::string & path)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void IObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||||
|
{
|
||||||
|
if (&object_storage_to == this)
|
||||||
|
copyObject(object_from, object_to, object_to_attributes);
|
||||||
|
|
||||||
|
auto in = readObject(object_from);
|
||||||
|
auto out = object_storage_to.writeObject(object_to);
|
||||||
|
copyData(*in, *out);
|
||||||
|
out->finalize();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,8 @@ public:
|
|||||||
|
|
||||||
virtual void copyObject(const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes = {}) = 0;
|
virtual void copyObject(const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes = {}) = 0;
|
||||||
|
|
||||||
|
virtual void copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes = {});
|
||||||
|
|
||||||
virtual ~IObjectStorage() = default;
|
virtual ~IObjectStorage() = default;
|
||||||
|
|
||||||
std::string getCacheBasePath() const;
|
std::string getCacheBasePath() const;
|
||||||
@ -113,6 +115,10 @@ public:
|
|||||||
|
|
||||||
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
||||||
|
|
||||||
|
virtual String getObjectsNamespace() const = 0;
|
||||||
|
|
||||||
|
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
FileCachePtr cache;
|
FileCachePtr cache;
|
||||||
};
|
};
|
||||||
|
@ -79,7 +79,7 @@ void registerDiskS3(DiskFactory & factory)
|
|||||||
getSettings(config, config_prefix, context),
|
getSettings(config, config_prefix, context),
|
||||||
uri.version_id, uri.bucket);
|
uri.version_id, uri.bucket);
|
||||||
|
|
||||||
std::shared_ptr<IDisk> s3disk = std::make_shared<DiskObjectStorage>(
|
std::shared_ptr<DiskObjectStorage> s3disk = std::make_shared<DiskObjectStorage>(
|
||||||
name,
|
name,
|
||||||
uri.key,
|
uri.key,
|
||||||
"DiskS3",
|
"DiskS3",
|
||||||
@ -98,6 +98,9 @@ void registerDiskS3(DiskFactory & factory)
|
|||||||
|
|
||||||
s3disk->startup();
|
s3disk->startup();
|
||||||
|
|
||||||
|
s3disk->restoreMetadataIfNeeded(config, config_prefix, context);
|
||||||
|
|
||||||
|
std::shared_ptr<IDisk> disk_result = s3disk;
|
||||||
|
|
||||||
#ifdef NDEBUG
|
#ifdef NDEBUG
|
||||||
bool use_cache = true;
|
bool use_cache = true;
|
||||||
@ -110,10 +113,11 @@ void registerDiskS3(DiskFactory & factory)
|
|||||||
if (config.getBool(config_prefix + ".cache_enabled", use_cache))
|
if (config.getBool(config_prefix + ".cache_enabled", use_cache))
|
||||||
{
|
{
|
||||||
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
|
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
|
||||||
s3disk = wrapWithCache(s3disk, "s3-cache", cache_path, metadata_path);
|
disk_result = wrapWithCache(disk_result, "s3-cache", cache_path, metadata_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<DiskRestartProxy>(s3disk);
|
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "DONE DISK");
|
||||||
|
return std::make_shared<DiskRestartProxy>(disk_result);
|
||||||
};
|
};
|
||||||
factory.registerDiskType("s3", creator);
|
factory.registerDiskType("s3", creator);
|
||||||
}
|
}
|
||||||
|
@ -81,11 +81,15 @@ bool S3ObjectStorage::exists(const std::string & path) const
|
|||||||
auto object_head = requestObjectHeadData(bucket, path);
|
auto object_head = requestObjectHeadData(bucket, path);
|
||||||
if (!object_head.IsSuccess())
|
if (!object_head.IsSuccess())
|
||||||
{
|
{
|
||||||
if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
|
if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "OBJECT DOESNT {} EXISTS", path);
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
throwIfError(object_head);
|
throwIfError(object_head);
|
||||||
}
|
}
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "OBJECT {} EXISTS", path);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,6 +295,15 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void S3ObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||||
|
{
|
||||||
|
/// Shortcut for S3
|
||||||
|
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||||
|
copyObjectImpl(bucket, object_from, dest_s3->bucket, object_to, {}, object_to_attributes);
|
||||||
|
else
|
||||||
|
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes);
|
||||||
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
void S3ObjectStorage::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
||||||
std::optional<ObjectAttributes> metadata) const
|
std::optional<ObjectAttributes> metadata) const
|
||||||
@ -428,7 +441,7 @@ void S3ObjectStorage::startup()
|
|||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
|
|
||||||
/// Need to be enabled if it was disabled during shutdown() call.
|
/// Need to be enabled if it was disabled during shutdown() call.
|
||||||
const_cast<Aws::S3::S3Client &>(*client_ptr.get()).EnableRequestProcessing();
|
const_cast<Aws::S3::S3Client &>(*client_ptr).EnableRequestProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||||
@ -437,6 +450,15 @@ void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration &
|
|||||||
client.set(getClient(config, config_prefix, context));
|
client.set(getClient(config, config_prefix, context));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||||
|
{
|
||||||
|
return std::make_unique<S3ObjectStorage>(
|
||||||
|
nullptr, getClient(config, config_prefix, context),
|
||||||
|
getSettings(config, config_prefix, context),
|
||||||
|
version_id, new_namespace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -17,7 +17,6 @@ namespace DB
|
|||||||
|
|
||||||
struct S3ObjectStorageSettings
|
struct S3ObjectStorageSettings
|
||||||
{
|
{
|
||||||
|
|
||||||
S3ObjectStorageSettings() = default;
|
S3ObjectStorageSettings() = default;
|
||||||
|
|
||||||
S3ObjectStorageSettings(
|
S3ObjectStorageSettings(
|
||||||
@ -95,9 +94,7 @@ public:
|
|||||||
|
|
||||||
void copyObject(const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes = {}) override;
|
void copyObject(const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes = {}) override;
|
||||||
|
|
||||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
void copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes = {}) override;
|
||||||
|
|
||||||
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
|
||||||
|
|
||||||
void shutdown() override;
|
void shutdown() override;
|
||||||
|
|
||||||
@ -105,7 +102,13 @@ public:
|
|||||||
|
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||||
|
|
||||||
|
String getObjectsNamespace() const override { return bucket; }
|
||||||
|
|
||||||
|
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||||
private:
|
private:
|
||||||
|
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||||
|
|
||||||
|
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
||||||
|
|
||||||
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||||
|
Loading…
Reference in New Issue
Block a user