Merge with master

This commit is contained in:
alesapin 2022-05-25 21:47:05 +02:00
parent 6f5c86e55e
commit c7b16065e1
10 changed files with 103 additions and 25 deletions

View File

@ -627,8 +627,8 @@
M(656, MEILISEARCH_EXCEPTION) \
M(657, UNSUPPORTED_MEILISEARCH_TYPE) \
M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \
M(659, HDFS_ERROR) \
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
M(660, HDFS_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -622,6 +622,17 @@ void DiskObjectStorage::restoreMetadataIfNeeded(const Poco::Util::AbstractConfig
}
}
void DiskObjectStorage::syncRevision(UInt64 revision)
{
metadata_helper->syncRevision(revision);
}
UInt64 DiskObjectStorage::getRevision() const
{
return metadata_helper->getRevision();
}
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const
{
if (i != 0)

View File

@ -172,6 +172,10 @@ public:
void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
void onFreeze(const String & path) override;
void syncRevision(UInt64 revision) override;
UInt64 getRevision() const override;
private:
const String name;
const String remote_fs_root_path;

View File

@ -23,7 +23,7 @@ static String revisionToString(UInt64 revision)
void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
{
const String path = disk->remote_fs_root_path + "operations/r" + revisionToString(revision) + "-" + operation_name;
const String path = disk->remote_fs_root_path + "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
auto buf = disk->object_storage->writeObject(path, WriteMode::Rewrite, metadata);
buf->write('0');
buf->finalize();
@ -300,15 +300,45 @@ static String shrinkKey(const String & path, const String & key)
static std::tuple<UInt64, String> extractRevisionAndOperationFromKey(const String & key)
{
String revision_str;
String suffix;
String operation;
/// Key has format: ../../r{revision}-{operation}
static const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+)$"};
/// Key has format: ../../r{revision}(-{hostname})-{operation}
static const re2::RE2 key_regexp{".*/r(\\d+)(-[\\w\\d\\-\\.]+)?-(\\w+)$"};
re2::RE2::FullMatch(key, key_regexp, &revision_str, &operation);
re2::RE2::FullMatch(key, key_regexp, &revision_str, &suffix, &operation);
return {(revision_str.empty() ? 0 : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong())), operation};
}
void DiskObjectStorageMetadataHelper::moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata)
{
if (disk->exists(to_path))
{
if (send_metadata)
{
auto revision = ++revision_counter;
const ObjectAttributes object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
createFileOperationObject("rename", revision, object_metadata);
}
if (disk->isDirectory(from_path))
{
for (auto it = disk->iterateDirectory(from_path); it->isValid(); it->next())
moveRecursiveOrRemove(it->path(), fs::path(to_path) / it->name(), false);
}
else
{
disk->removeFile(from_path);
}
}
else
{
disk->moveFile(from_path, to_path, send_metadata);
}
}
void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
{
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
@ -385,7 +415,6 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
else
continue;
disk->createDirectories(directoryPath(path));
auto relative_key = shrinkKey(source_path, key);
@ -457,7 +486,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
auto to_path = object_attributes["to_path"];
if (disk->exists(from_path))
{
disk->moveFile(from_path, to_path, send_metadata);
moveRecursiveOrRemove(from_path, to_path, send_metadata);
LOG_TRACE(disk->log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <base/getFQDNOrHostName.h>
namespace DB
{
@ -25,9 +26,37 @@ public:
DiskObjectStorageMetadataHelper(DiskObjectStorage * disk_, ReadSettings read_settings_)
: disk(disk_)
, read_settings(std::move(read_settings_))
, operation_log_suffix("-" + getFQDNOrHostName())
{
}
/// Most important method, called on DiskObjectStorage startup
void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
void syncRevision(UInt64 revision)
{
UInt64 local_revision = revision_counter.load();
while ((revision > local_revision) && revision_counter.compare_exchange_weak(local_revision, revision));
}
UInt64 getRevision() const
{
return revision_counter.load();
}
static int readSchemaVersion(IObjectStorage * object_storage, const String & source_path);
void migrateToRestorableSchema();
void findLastRevision();
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const;
/// Version with possibility to backup-restore metadata.
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
std::atomic<UInt64> revision_counter = 0;
private:
struct RestoreInformation
{
UInt64 revision = LATEST_REVISION;
@ -38,32 +67,24 @@ public:
using Futures = std::vector<std::future<void>>;
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const;
/// Move file or files in directory when possible and remove files in other case
/// to restore by S3 operation log with same operations from different replicas
void moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata);
void findLastRevision();
static int readSchemaVersion(IObjectStorage * object_storage, const String & source_path);
void saveSchemaVersion(const int & version) const;
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
void migrateFileToRestorableSchema(const String & path) const;
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchema();
/// Most important method, called on DiskObjectStorage startup
void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);
void processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector<String> & keys) const;
void restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);
std::atomic<UInt64> revision_counter = 0;
inline static const String RESTORE_FILE_NAME = "restore";
/// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";
/// Version with possibility to backup-restore metadata.
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
/// Directories with data.
const std::vector<String> data_roots {"data", "store"};
@ -72,6 +93,8 @@ public:
ObjectStoragePtr object_storage_from_another_namespace;
ReadSettings read_settings;
String operation_log_suffix;
};
}

View File

@ -14,9 +14,9 @@
#include <IO/S3Common.h>
#include <Disks/DiskCacheWrapper.h>
#include <Storages/StorageS3Settings.h>
#include <Disk/ObjectStorages/S3/ProxyConfiguration.h>
#include <Disk/ObjectStorages/S3/ProxyListConfiguration.h>
#include <Disk/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskLocal.h>
#include <Common/FileCacheFactory.h>
@ -149,4 +149,5 @@ std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfigura
}
}
>>>>>> master:src/Disks/S3/registerDiskS3.cpp
#endif

View File

@ -10,6 +10,7 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <base/getFQDNOrHostName.h>
#include <Common/FileCacheFactory.h>
#include <Disks/DiskCacheWrapper.h>

View File

@ -518,7 +518,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (!disk)
disk = reservation->getDisk();
UInt64 revision = parse<UInt64>(in.getResponseCookie("disk_revision", "0"));
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
if (revision)
disk->syncRevision(revision);

View File

@ -239,7 +239,11 @@ private:
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context)
const Aws::S3::S3Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context))
{
}

View File

@ -36,7 +36,12 @@ public:
{
public:
DisclosedGlobIterator(
const Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context);
const Aws::S3::S3Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context);
String next();
private: