mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #61116 from jkartseva/s3-plain-with-replace
S3-plain based disk supporting directory rename
This commit is contained in:
commit
90d8e3cb97
@ -769,6 +769,7 @@ In addition to local block devices, ClickHouse supports these storage types:
|
||||
- [`web` for read-only from web](#web-storage)
|
||||
- [`cache` for local caching](/docs/en/operations/storing-data.md/#using-local-cache)
|
||||
- [`s3_plain` for backups to S3](/docs/en/operations/backup#backuprestore-using-an-s3-disk)
|
||||
- [`s3_plain_rewritable` for immutable, non-replicated tables in S3](/docs/en/operations/storing-data.md#s3-plain-rewritable-storage)
|
||||
|
||||
## Using Multiple Block Devices for Data Storage {#table_engine-mergetree-multiple-volumes}
|
||||
|
||||
|
@ -28,7 +28,7 @@ Starting from 24.1 clickhouse version, it is possible to use a new configuration
|
||||
It requires to specify:
|
||||
1. `type` equal to `object_storage`
|
||||
2. `object_storage_type`, equal to one of `s3`, `azure_blob_storage` (or just `azure` from `24.3`), `hdfs`, `local_blob_storage` (or just `local` from `24.3`), `web`.
|
||||
Optionally, `metadata_type` can be specified (it is equal to `local` by default), but it can also be set to `plain`, `web`.
|
||||
Optionally, `metadata_type` can be specified (it is equal to `local` by default), but it can also be set to `plain`, `web` and, starting from `24.4`, `plain_rewritable`.
|
||||
Usage of `plain` metadata type is described in [plain storage section](/docs/en/operations/storing-data.md/#storing-data-on-webserver), `web` metadata type can be used only with `web` object storage type, `local` metadata type stores metadata files locally (each metadata files contains mapping to files in object storage and some additional meta information about them).
|
||||
|
||||
E.g. configuration option
|
||||
@ -341,6 +341,36 @@ Configuration:
|
||||
</s3_plain>
|
||||
```
|
||||
|
||||
### Using S3 Plain Rewritable Storage {#s3-plain-rewritable-storage}
|
||||
A new disk type `s3_plain_rewritable` was introduced in `24.4`.
|
||||
Similar to the `s3_plain` disk type, it does not require additional storage for metadata files; instead, metadata is stored in S3.
|
||||
Unlike `s3_plain` disk type, `s3_plain_rewritable` allows executing merges and supports INSERT operations.
|
||||
[Mutations](/docs/en/sql-reference/statements/alter#mutations) and replication of tables are not supported.
|
||||
|
||||
A use case for this disk type are non-replicated `MergeTree` tables. Although the `s3` disk type is suitable for non-replicated
|
||||
MergeTree tables, you may opt for the `s3_plain_rewritable` disk type if you do not require local metadata for the table and are
|
||||
willing to accept a limited set of operations. This could be useful, for example, for system tables.
|
||||
|
||||
Configuration:
|
||||
``` xml
|
||||
<s3_plain_rewritable>
|
||||
<type>s3_plain_rewritable</type>
|
||||
<endpoint>https://s3.eu-west-1.amazonaws.com/clickhouse-eu-west-1.clickhouse.com/data/</endpoint>
|
||||
<use_environment_credentials>1</use_environment_credentials>
|
||||
</s3_plain_rewritable>
|
||||
```
|
||||
|
||||
is equal to
|
||||
``` xml
|
||||
<s3_plain_rewritable>
|
||||
<type>object_storage</type>
|
||||
<object_storage_type>s3</object_storage_type>
|
||||
<metadata_type>plain_rewritable</metadata_type>
|
||||
<endpoint>https://s3.eu-west-1.amazonaws.com/clickhouse-eu-west-1.clickhouse.com/data/</endpoint>
|
||||
<use_environment_credentials>1</use_environment_credentials>
|
||||
</s3_plain_rewritable>
|
||||
```
|
||||
|
||||
### Using Azure Blob Storage {#azure-blob-storage}
|
||||
|
||||
`MergeTree` family table engines can store data to [Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/) using a disk with type `azure_blob_storage`.
|
||||
|
@ -121,9 +121,12 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/DiskType.cpp
|
||||
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/IObjectStorage.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataOperationsHolder.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainRewritableObjectStorage.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataFromDiskTransactionState.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageTransactionState.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorage.cpp
|
||||
@ -137,6 +140,7 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3Capabilities.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/diskSettings.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/DiskS3Utils.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/CommonPathPrefixKeyGenerator.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/ObjectStorageFactory.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFactory.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/RegisterDiskObjectStorage.cpp
|
||||
|
@ -14,10 +14,7 @@ public:
|
||||
, re_gen(key_template)
|
||||
{
|
||||
}
|
||||
DB::ObjectStorageKey generate(const String &) const override
|
||||
{
|
||||
return DB::ObjectStorageKey::createAsAbsolute(re_gen.generate());
|
||||
}
|
||||
DB::ObjectStorageKey generate(const String &, bool) const override { return DB::ObjectStorageKey::createAsAbsolute(re_gen.generate()); }
|
||||
|
||||
private:
|
||||
String key_template;
|
||||
@ -32,7 +29,7 @@ public:
|
||||
: key_prefix(std::move(key_prefix_))
|
||||
{}
|
||||
|
||||
DB::ObjectStorageKey generate(const String &) const override
|
||||
DB::ObjectStorageKey generate(const String &, bool) const override
|
||||
{
|
||||
/// Path to store the new S3 object.
|
||||
|
||||
@ -63,7 +60,7 @@ public:
|
||||
: key_prefix(std::move(key_prefix_))
|
||||
{}
|
||||
|
||||
DB::ObjectStorageKey generate(const String & path) const override
|
||||
DB::ObjectStorageKey generate(const String & path, bool) const override
|
||||
{
|
||||
return DB::ObjectStorageKey::createAsRelative(key_prefix, path);
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "ObjectStorageKey.h"
|
||||
#include <memory>
|
||||
#include "ObjectStorageKey.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -9,8 +9,9 @@ namespace DB
|
||||
class IObjectStorageKeysGenerator
|
||||
{
|
||||
public:
|
||||
virtual ObjectStorageKey generate(const String & path) const = 0;
|
||||
virtual ~IObjectStorageKeysGenerator() = default;
|
||||
|
||||
virtual ObjectStorageKey generate(const String & path, bool is_directory) const = 0;
|
||||
};
|
||||
|
||||
using ObjectStorageKeysGeneratorPtr = std::shared_ptr<IObjectStorageKeysGenerator>;
|
||||
|
@ -16,6 +16,8 @@ MetadataStorageType metadataTypeFromString(const String & type)
|
||||
return MetadataStorageType::Local;
|
||||
if (check_type == "plain")
|
||||
return MetadataStorageType::Plain;
|
||||
if (check_type == "plain_rewritable")
|
||||
return MetadataStorageType::PlainRewritable;
|
||||
if (check_type == "web")
|
||||
return MetadataStorageType::StaticWeb;
|
||||
|
||||
|
@ -28,6 +28,7 @@ enum class MetadataStorageType
|
||||
None,
|
||||
Local,
|
||||
Plain,
|
||||
PlainRewritable,
|
||||
StaticWeb,
|
||||
};
|
||||
|
||||
|
@ -363,6 +363,8 @@ public:
|
||||
|
||||
virtual bool isWriteOnce() const { return false; }
|
||||
|
||||
virtual bool supportsHardLinks() const { return true; }
|
||||
|
||||
/// Check if disk is broken. Broken disks will have 0 space and cannot be used.
|
||||
virtual bool isBroken() const { return false; }
|
||||
|
||||
|
72
src/Disks/ObjectStorages/CommonPathPrefixKeyGenerator.cpp
Normal file
72
src/Disks/ObjectStorages/CommonPathPrefixKeyGenerator.cpp
Normal file
@ -0,0 +1,72 @@
|
||||
#include "CommonPathPrefixKeyGenerator.h"
|
||||
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
|
||||
#include <deque>
|
||||
#include <filesystem>
|
||||
#include <tuple>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
CommonPathPrefixKeyGenerator::CommonPathPrefixKeyGenerator(
|
||||
String key_prefix_, SharedMutex & shared_mutex_, std::weak_ptr<PathMap> path_map_)
|
||||
: storage_key_prefix(key_prefix_), shared_mutex(shared_mutex_), path_map(std::move(path_map_))
|
||||
{
|
||||
}
|
||||
|
||||
ObjectStorageKey CommonPathPrefixKeyGenerator::generate(const String & path, bool is_directory) const
|
||||
{
|
||||
const auto & [object_key_prefix, suffix_parts] = getLongestObjectKeyPrefix(path);
|
||||
|
||||
auto key = std::filesystem::path(object_key_prefix.empty() ? storage_key_prefix : object_key_prefix);
|
||||
|
||||
/// The longest prefix is the same as path, meaning that the path is already mapped.
|
||||
if (suffix_parts.empty())
|
||||
return ObjectStorageKey::createAsRelative(std::move(key));
|
||||
|
||||
/// File and top-level directory paths are mapped as is.
|
||||
if (!is_directory || object_key_prefix.empty())
|
||||
for (const auto & part : suffix_parts)
|
||||
key /= part;
|
||||
/// Replace the last part of the directory path with a pseudorandom suffix.
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i + 1 < suffix_parts.size(); ++i)
|
||||
key /= suffix_parts[i];
|
||||
|
||||
constexpr size_t part_size = 16;
|
||||
key /= getRandomASCIIString(part_size);
|
||||
}
|
||||
|
||||
return ObjectStorageKey::createAsRelative(key);
|
||||
}
|
||||
|
||||
std::tuple<std::string, std::vector<std::string>> CommonPathPrefixKeyGenerator::getLongestObjectKeyPrefix(const std::string & path) const
|
||||
{
|
||||
std::filesystem::path p(path);
|
||||
std::deque<std::string> dq;
|
||||
|
||||
std::shared_lock lock(shared_mutex);
|
||||
|
||||
auto ptr = path_map.lock();
|
||||
|
||||
while (p != p.root_path())
|
||||
{
|
||||
auto it = ptr->find(p / "");
|
||||
if (it != ptr->end())
|
||||
{
|
||||
std::vector<std::string> vec(std::make_move_iterator(dq.begin()), std::make_move_iterator(dq.end()));
|
||||
return std::make_tuple(it->second, std::move(vec));
|
||||
}
|
||||
|
||||
if (!p.filename().empty())
|
||||
dq.push_front(p.filename());
|
||||
|
||||
p = p.parent_path();
|
||||
}
|
||||
|
||||
return {std::string(), std::vector<std::string>(std::make_move_iterator(dq.begin()), std::make_move_iterator(dq.end()))};
|
||||
}
|
||||
|
||||
}
|
41
src/Disks/ObjectStorages/CommonPathPrefixKeyGenerator.h
Normal file
41
src/Disks/ObjectStorages/CommonPathPrefixKeyGenerator.h
Normal file
@ -0,0 +1,41 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ObjectStorageKeyGenerator.h>
|
||||
#include <Common/SharedMutex.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Object storage key generator used specifically with the
|
||||
/// MetadataStorageFromPlainObjectStorage if multiple writes are allowed.
|
||||
|
||||
/// It searches for the local (metadata) path in a pre-loaded path map.
|
||||
/// If no such path exists, it searches for the parent path, until it is found
|
||||
/// or no parent path exists.
|
||||
///
|
||||
/// The key generator ensures that the original directory hierarchy is
|
||||
/// preserved, which is required for the MergeTree family.
|
||||
class CommonPathPrefixKeyGenerator : public IObjectStorageKeysGenerator
|
||||
{
|
||||
public:
|
||||
/// Local to remote path map. Leverages filesystem::path comparator for paths.
|
||||
using PathMap = std::map<std::filesystem::path, std::string>;
|
||||
|
||||
explicit CommonPathPrefixKeyGenerator(String key_prefix_, SharedMutex & shared_mutex_, std::weak_ptr<PathMap> path_map_);
|
||||
|
||||
ObjectStorageKey generate(const String & path, bool is_directory) const override;
|
||||
|
||||
private:
|
||||
/// Longest key prefix and unresolved parts of the source path.
|
||||
std::tuple<std::string, std::vector<String>> getLongestObjectKeyPrefix(const String & path) const;
|
||||
|
||||
const String storage_key_prefix;
|
||||
|
||||
SharedMutex & shared_mutex;
|
||||
std::weak_ptr<PathMap> path_map;
|
||||
};
|
||||
|
||||
}
|
@ -112,20 +112,21 @@ size_t DiskObjectStorage::getFileSize(const String & path) const
|
||||
return metadata_storage->getFileSize(path);
|
||||
}
|
||||
|
||||
void DiskObjectStorage::moveDirectory(const String & from_path, const String & to_path)
|
||||
{
|
||||
if (send_metadata)
|
||||
sendMoveMetadata(from_path, to_path);
|
||||
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
transaction->moveDirectory(from_path, to_path);
|
||||
transaction->commit();
|
||||
}
|
||||
|
||||
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path, bool should_send_metadata)
|
||||
{
|
||||
|
||||
if (should_send_metadata)
|
||||
{
|
||||
auto revision = metadata_helper->revision_counter + 1;
|
||||
metadata_helper->revision_counter += 1;
|
||||
|
||||
const ObjectAttributes object_metadata {
|
||||
{"from_path", from_path},
|
||||
{"to_path", to_path}
|
||||
};
|
||||
metadata_helper->createFileOperationObject("rename", revision, object_metadata);
|
||||
}
|
||||
sendMoveMetadata(from_path, to_path);
|
||||
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
transaction->moveFile(from_path, to_path);
|
||||
@ -409,6 +410,15 @@ bool DiskObjectStorage::tryReserve(UInt64 bytes)
|
||||
|
||||
return false;
|
||||
}
|
||||
void DiskObjectStorage::sendMoveMetadata(const String & from_path, const String & to_path)
|
||||
{
|
||||
chassert(send_metadata);
|
||||
auto revision = metadata_helper->revision_counter + 1;
|
||||
metadata_helper->revision_counter += 1;
|
||||
|
||||
const ObjectAttributes object_metadata{{"from_path", from_path}, {"to_path", to_path}};
|
||||
metadata_helper->createFileOperationObject("rename", revision, object_metadata);
|
||||
}
|
||||
|
||||
bool DiskObjectStorage::supportsCache() const
|
||||
{
|
||||
@ -425,6 +435,11 @@ bool DiskObjectStorage::isWriteOnce() const
|
||||
return object_storage->isWriteOnce();
|
||||
}
|
||||
|
||||
bool DiskObjectStorage::supportsHardLinks() const
|
||||
{
|
||||
return !isWriteOnce() && !object_storage->isPlain();
|
||||
}
|
||||
|
||||
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
|
||||
{
|
||||
const auto config_prefix = "storage_configuration.disks." + name;
|
||||
|
@ -112,7 +112,7 @@ public:
|
||||
|
||||
void clearDirectory(const String & path) override;
|
||||
|
||||
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
|
||||
void moveDirectory(const String & from_path, const String & to_path) override;
|
||||
|
||||
void removeDirectory(const String & path) override;
|
||||
|
||||
@ -183,6 +183,8 @@ public:
|
||||
/// MergeTree table on this disk.
|
||||
bool isWriteOnce() const override;
|
||||
|
||||
bool supportsHardLinks() const override;
|
||||
|
||||
/// Get structure of object storage this disk works with. Examples:
|
||||
/// DiskObjectStorage(S3ObjectStorage)
|
||||
/// DiskObjectStorage(CachedObjectStorage(S3ObjectStorage))
|
||||
@ -228,6 +230,7 @@ private:
|
||||
std::mutex reservation_mutex;
|
||||
|
||||
bool tryReserve(UInt64 bytes);
|
||||
void sendMoveMetadata(const String & from_path, const String & to_path);
|
||||
|
||||
const bool send_metadata;
|
||||
|
||||
|
@ -507,7 +507,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
std::string to_path;
|
||||
|
||||
StoredObjects created_objects;
|
||||
IObjectStorage& destination_object_storage;
|
||||
IObjectStorage & destination_object_storage;
|
||||
|
||||
CopyFileObjectStorageOperation(
|
||||
IObjectStorage & object_storage_,
|
||||
@ -714,7 +714,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
||||
{
|
||||
/// Otherwise we will produce lost blobs which nobody points to
|
||||
/// WriteOnce storages are not affected by the issue
|
||||
if (!tx->object_storage.isWriteOnce() && tx->metadata_storage.exists(path))
|
||||
if (!tx->object_storage.isPlain() && tx->metadata_storage.exists(path))
|
||||
tx->object_storage.removeObjectsIfExist(tx->metadata_storage.getStorageObjects(path));
|
||||
|
||||
tx->metadata_transaction->createMetadataFile(path, key_, count);
|
||||
@ -747,10 +747,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
||||
{
|
||||
/// Otherwise we will produce lost blobs which nobody points to
|
||||
/// WriteOnce storages are not affected by the issue
|
||||
if (!object_storage_tx->object_storage.isWriteOnce() && object_storage_tx->metadata_storage.exists(path))
|
||||
if (!object_storage_tx->object_storage.isPlain() && object_storage_tx->metadata_storage.exists(path))
|
||||
{
|
||||
object_storage_tx->object_storage.removeObjectsIfExist(
|
||||
object_storage_tx->metadata_storage.getStorageObjects(path));
|
||||
object_storage_tx->object_storage.removeObjectsIfExist(object_storage_tx->metadata_storage.getStorageObjects(path));
|
||||
}
|
||||
|
||||
tx->createMetadataFile(path, key_, count);
|
||||
@ -877,14 +876,14 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
|
||||
|
||||
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
|
||||
{
|
||||
operations_to_execute.emplace_back(
|
||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
operations_to_execute.emplace_back(std::make_unique<CopyFileObjectStorageOperation>(
|
||||
object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
}
|
||||
|
||||
void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
|
||||
{
|
||||
operations_to_execute.emplace_back(
|
||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
operations_to_execute.emplace_back(std::make_unique<CopyFileObjectStorageOperation>(
|
||||
object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
}
|
||||
|
||||
void DiskObjectStorageTransaction::commit()
|
||||
|
19
src/Disks/ObjectStorages/IMetadataOperation.h
Normal file
19
src/Disks/ObjectStorages/IMetadataOperation.h
Normal file
@ -0,0 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <Common/SharedMutex.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct IMetadataOperation
|
||||
{
|
||||
virtual void execute(std::unique_lock<SharedMutex> & metadata_lock) = 0;
|
||||
virtual void undo(std::unique_lock<SharedMutex> & metadata_lock) = 0;
|
||||
virtual void finalize() { }
|
||||
virtual ~IMetadataOperation() = default;
|
||||
};
|
||||
|
||||
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
|
||||
|
||||
}
|
@ -145,7 +145,7 @@ public:
|
||||
|
||||
virtual ~IMetadataTransaction() = default;
|
||||
|
||||
private:
|
||||
protected:
|
||||
[[noreturn]] static void throwNotImplemented()
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
|
||||
@ -229,7 +229,7 @@ public:
|
||||
/// object_storage_path is absolute.
|
||||
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
|
||||
|
||||
private:
|
||||
protected:
|
||||
[[noreturn]] static void throwNotImplemented()
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
|
||||
|
@ -1,11 +1,12 @@
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ObjectStorageKeyGenerator.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -83,6 +83,9 @@ using ObjectKeysWithMetadata = std::vector<ObjectKeyWithMetadata>;
|
||||
class IObjectStorageIterator;
|
||||
using ObjectStorageIteratorPtr = std::shared_ptr<IObjectStorageIterator>;
|
||||
|
||||
class IObjectStorageKeysGenerator;
|
||||
using ObjectStorageKeysGeneratorPtr = std::shared_ptr<IObjectStorageKeysGenerator>;
|
||||
|
||||
/// Base class for all object storages which implement some subset of ordinary filesystem operations.
|
||||
///
|
||||
/// Examples of object storages are S3, Azure Blob Storage, HDFS.
|
||||
@ -208,6 +211,12 @@ public:
|
||||
/// Path can be generated either independently or based on `path`.
|
||||
virtual ObjectStorageKey generateObjectKeyForPath(const std::string & path) const = 0;
|
||||
|
||||
/// Object key prefix for local paths in the directory 'path'.
|
||||
virtual ObjectStorageKey generateObjectKeyPrefixForDirectoryPath(const std::string & /* path */) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'generateObjectKeyPrefixForDirectoryPath' is not implemented");
|
||||
}
|
||||
|
||||
/// Get unique id for passed absolute path in object storage.
|
||||
virtual std::string getUniqueId(const std::string & path) const { return path; }
|
||||
|
||||
@ -226,6 +235,8 @@ public:
|
||||
|
||||
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;
|
||||
|
||||
virtual void setKeysGenerator(ObjectStorageKeysGeneratorPtr) { }
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
virtual std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient()
|
||||
{
|
||||
|
@ -1,23 +0,0 @@
|
||||
#include <base/defines.h>
|
||||
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string toString(MetadataFromDiskTransactionState state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case MetadataFromDiskTransactionState::PREPARING:
|
||||
return "PREPARING";
|
||||
case MetadataFromDiskTransactionState::FAILED:
|
||||
return "FAILED";
|
||||
case MetadataFromDiskTransactionState::COMMITTED:
|
||||
return "COMMITTED";
|
||||
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
|
||||
return "PARTIALLY_ROLLED_BACK";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
}
|
93
src/Disks/ObjectStorages/MetadataOperationsHolder.cpp
Normal file
93
src/Disks/ObjectStorages/MetadataOperationsHolder.cpp
Normal file
@ -0,0 +1,93 @@
|
||||
#include "MetadataOperationsHolder.h"
|
||||
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FS_METADATA_ERROR;
|
||||
}
|
||||
|
||||
void MetadataOperationsHolder::rollback(std::unique_lock<SharedMutex> & lock, size_t until_pos)
|
||||
{
|
||||
/// Otherwise everything is alright
|
||||
if (state == MetadataStorageTransactionState::FAILED)
|
||||
{
|
||||
for (int64_t i = until_pos; i >= 0; --i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->undo(lock);
|
||||
}
|
||||
catch (Exception & ex)
|
||||
{
|
||||
state = MetadataStorageTransactionState::PARTIALLY_ROLLED_BACK;
|
||||
ex.addMessage(fmt::format("While rolling back operation #{}", i));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Nothing to do, transaction committed or not even started to commit
|
||||
}
|
||||
}
|
||||
|
||||
void MetadataOperationsHolder::addOperation(MetadataOperationPtr && operation)
|
||||
{
|
||||
if (state != MetadataStorageTransactionState::PREPARING)
|
||||
throw Exception(
|
||||
ErrorCodes::FS_METADATA_ERROR,
|
||||
"Cannot add operations to transaction in {} state, it should be in {} state",
|
||||
toString(state),
|
||||
toString(MetadataStorageTransactionState::PREPARING));
|
||||
|
||||
operations.emplace_back(std::move(operation));
|
||||
}
|
||||
|
||||
void MetadataOperationsHolder::commitImpl(SharedMutex & metadata_mutex)
|
||||
{
|
||||
if (state != MetadataStorageTransactionState::PREPARING)
|
||||
throw Exception(
|
||||
ErrorCodes::FS_METADATA_ERROR,
|
||||
"Cannot commit transaction in {} state, it should be in {} state",
|
||||
toString(state),
|
||||
toString(MetadataStorageTransactionState::PREPARING));
|
||||
|
||||
{
|
||||
std::unique_lock lock(metadata_mutex);
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->execute(lock);
|
||||
}
|
||||
catch (Exception & ex)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
ex.addMessage(fmt::format("While committing metadata operation #{}", i));
|
||||
state = MetadataStorageTransactionState::FAILED;
|
||||
rollback(lock, i);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Do it in "best effort" mode
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i));
|
||||
}
|
||||
}
|
||||
|
||||
state = MetadataStorageTransactionState::COMMITTED;
|
||||
}
|
||||
}
|
29
src/Disks/ObjectStorages/MetadataOperationsHolder.h
Normal file
29
src/Disks/ObjectStorages/MetadataOperationsHolder.h
Normal file
@ -0,0 +1,29 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <Disks/ObjectStorages/IMetadataOperation.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
|
||||
#include <Common/SharedMutex.h>
|
||||
|
||||
/**
|
||||
* Implementations for transactional operations with metadata used by MetadataStorageFromDisk
|
||||
* and MetadataStorageFromPlainObjectStorage.
|
||||
*/
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MetadataOperationsHolder
|
||||
{
|
||||
private:
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataStorageTransactionState state{MetadataStorageTransactionState::PREPARING};
|
||||
|
||||
void rollback(std::unique_lock<SharedMutex> & lock, size_t until_pos);
|
||||
|
||||
protected:
|
||||
void addOperation(MetadataOperationPtr && operation);
|
||||
void commitImpl(SharedMutex & metadata_mutex);
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainRewritableObjectStorage.h>
|
||||
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
|
||||
#include <Disks/ObjectStorages/Web/MetadataStorageFromStaticFilesWebServer.h>
|
||||
#endif
|
||||
@ -118,6 +119,20 @@ void registerPlainMetadataStorage(MetadataStorageFactory & factory)
|
||||
});
|
||||
}
|
||||
|
||||
void registerPlainRewritableMetadataStorage(MetadataStorageFactory & factory)
|
||||
{
|
||||
factory.registerMetadataStorageType(
|
||||
"plain_rewritable",
|
||||
[](const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ObjectStoragePtr object_storage) -> MetadataStoragePtr
|
||||
{
|
||||
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
|
||||
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorage>(object_storage, key_compatibility_prefix);
|
||||
});
|
||||
}
|
||||
|
||||
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
|
||||
void registerMetadataStorageFromStaticFilesWebServer(MetadataStorageFactory & factory)
|
||||
{
|
||||
@ -137,6 +152,7 @@ void registerMetadataStorages()
|
||||
auto & factory = MetadataStorageFactory::instance();
|
||||
registerMetadataStorageFromDisk(factory);
|
||||
registerPlainMetadataStorage(factory);
|
||||
registerPlainRewritableMetadataStorage(factory);
|
||||
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
|
||||
registerMetadataStorageFromStaticFilesWebServer(factory);
|
||||
#endif
|
||||
|
@ -10,14 +10,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FS_METADATA_ERROR;
|
||||
}
|
||||
|
||||
MetadataStorageFromDisk::MetadataStorageFromDisk(DiskPtr disk_, String compatible_key_prefix_)
|
||||
: disk(disk_)
|
||||
, compatible_key_prefix(compatible_key_prefix_)
|
||||
: disk(disk_), compatible_key_prefix(compatible_key_prefix_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -158,83 +152,9 @@ const IMetadataStorage & MetadataStorageFromDiskTransaction::getStorageForNonTra
|
||||
return metadata_storage;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::addOperation(MetadataOperationPtr && operation)
|
||||
{
|
||||
if (state != MetadataFromDiskTransactionState::PREPARING)
|
||||
throw Exception(
|
||||
ErrorCodes::FS_METADATA_ERROR,
|
||||
"Cannot add operations to transaction in {} state, it should be in {} state",
|
||||
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
|
||||
|
||||
operations.emplace_back(std::move(operation));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::commit()
|
||||
{
|
||||
if (state != MetadataFromDiskTransactionState::PREPARING)
|
||||
throw Exception(
|
||||
ErrorCodes::FS_METADATA_ERROR,
|
||||
"Cannot commit transaction in {} state, it should be in {} state",
|
||||
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
|
||||
|
||||
{
|
||||
std::unique_lock lock(metadata_storage.metadata_mutex);
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->execute(lock);
|
||||
}
|
||||
catch (Exception & ex)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
ex.addMessage(fmt::format("While committing metadata operation #{}", i));
|
||||
state = MetadataFromDiskTransactionState::FAILED;
|
||||
rollback(i);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Do it in "best effort" mode
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i));
|
||||
}
|
||||
}
|
||||
|
||||
state = MetadataFromDiskTransactionState::COMMITTED;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::rollback(size_t until_pos)
|
||||
{
|
||||
/// Otherwise everything is alright
|
||||
if (state == MetadataFromDiskTransactionState::FAILED)
|
||||
{
|
||||
for (int64_t i = until_pos; i >= 0; --i)
|
||||
{
|
||||
try
|
||||
{
|
||||
operations[i]->undo();
|
||||
}
|
||||
catch (Exception & ex)
|
||||
{
|
||||
state = MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK;
|
||||
ex.addMessage(fmt::format("While rolling back operation #{}", i));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Nothing to do, transaction committed or not even started to commit
|
||||
}
|
||||
MetadataOperationsHolder::commitImpl(metadata_storage.metadata_mutex);
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::writeStringToFile(
|
||||
|
@ -5,8 +5,9 @@
|
||||
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
|
||||
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
|
||||
#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -74,18 +75,11 @@ public:
|
||||
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> & lock) const;
|
||||
};
|
||||
|
||||
class MetadataStorageFromDiskTransaction final : public IMetadataTransaction
|
||||
class MetadataStorageFromDiskTransaction final : public IMetadataTransaction, private MetadataOperationsHolder
|
||||
{
|
||||
private:
|
||||
const MetadataStorageFromDisk & metadata_storage;
|
||||
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
|
||||
|
||||
void addOperation(MetadataOperationPtr && operation);
|
||||
|
||||
void rollback(size_t until_pos);
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromDiskTransaction(const MetadataStorageFromDisk & metadata_storage_)
|
||||
: metadata_storage(metadata_storage_)
|
||||
@ -135,7 +129,6 @@ public:
|
||||
|
||||
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
|
||||
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -32,7 +32,7 @@ void SetLastModifiedOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.setLastModified(path, new_timestamp);
|
||||
}
|
||||
|
||||
void SetLastModifiedOperation::undo()
|
||||
void SetLastModifiedOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.setLastModified(path, old_timestamp);
|
||||
}
|
||||
@ -50,7 +50,7 @@ void ChmodOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.chmod(path, mode);
|
||||
}
|
||||
|
||||
void ChmodOperation::undo()
|
||||
void ChmodOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.chmod(path, old_mode);
|
||||
}
|
||||
@ -68,7 +68,7 @@ void UnlinkFileOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.removeFile(path);
|
||||
}
|
||||
|
||||
void UnlinkFileOperation::undo()
|
||||
void UnlinkFileOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
auto buf = disk.writeFile(path);
|
||||
writeString(prev_data, *buf);
|
||||
@ -86,7 +86,7 @@ void CreateDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.createDirectory(path);
|
||||
}
|
||||
|
||||
void CreateDirectoryOperation::undo()
|
||||
void CreateDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.removeDirectory(path);
|
||||
}
|
||||
@ -112,7 +112,7 @@ void CreateDirectoryRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.createDirectory(path_to_create);
|
||||
}
|
||||
|
||||
void CreateDirectoryRecursiveOperation::undo()
|
||||
void CreateDirectoryRecursiveOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
for (const auto & path_created : paths_created)
|
||||
disk.removeDirectory(path_created);
|
||||
@ -129,7 +129,7 @@ void RemoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.removeDirectory(path);
|
||||
}
|
||||
|
||||
void RemoveDirectoryOperation::undo()
|
||||
void RemoveDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.createDirectory(path);
|
||||
}
|
||||
@ -149,7 +149,7 @@ void RemoveRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.moveDirectory(path, temp_path);
|
||||
}
|
||||
|
||||
void RemoveRecursiveOperation::undo()
|
||||
void RemoveRecursiveOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
if (disk.isFile(temp_path))
|
||||
disk.moveFile(temp_path, path);
|
||||
@ -187,10 +187,10 @@ void CreateHardlinkOperation::execute(std::unique_lock<SharedMutex> & lock)
|
||||
disk.createHardLink(path_from, path_to);
|
||||
}
|
||||
|
||||
void CreateHardlinkOperation::undo()
|
||||
void CreateHardlinkOperation::undo(std::unique_lock<SharedMutex> & lock)
|
||||
{
|
||||
if (write_operation)
|
||||
write_operation->undo();
|
||||
write_operation->undo(lock);
|
||||
disk.removeFile(path_to);
|
||||
}
|
||||
|
||||
@ -206,7 +206,7 @@ void MoveFileOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.moveFile(path_from, path_to);
|
||||
}
|
||||
|
||||
void MoveFileOperation::undo()
|
||||
void MoveFileOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.moveFile(path_to, path_from);
|
||||
}
|
||||
@ -223,7 +223,7 @@ void MoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.moveDirectory(path_from, path_to);
|
||||
}
|
||||
|
||||
void MoveDirectoryOperation::undo()
|
||||
void MoveDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.moveDirectory(path_to, path_from);
|
||||
}
|
||||
@ -244,7 +244,7 @@ void ReplaceFileOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
disk.replaceFile(path_from, path_to);
|
||||
}
|
||||
|
||||
void ReplaceFileOperation::undo()
|
||||
void ReplaceFileOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
disk.moveFile(path_to, path_from);
|
||||
disk.moveFile(temp_path_to, path_to);
|
||||
@ -275,7 +275,7 @@ void WriteFileOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
buf->finalize();
|
||||
}
|
||||
|
||||
void WriteFileOperation::undo()
|
||||
void WriteFileOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
if (!existed)
|
||||
{
|
||||
@ -303,10 +303,10 @@ void AddBlobOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
|
||||
write_operation->execute(metadata_lock);
|
||||
}
|
||||
|
||||
void AddBlobOperation::undo()
|
||||
void AddBlobOperation::undo(std::unique_lock<SharedMutex> & lock)
|
||||
{
|
||||
if (write_operation)
|
||||
write_operation->undo();
|
||||
write_operation->undo(lock);
|
||||
}
|
||||
|
||||
void UnlinkMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
|
||||
@ -325,17 +325,17 @@ void UnlinkMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metada
|
||||
unlink_operation->execute(metadata_lock);
|
||||
}
|
||||
|
||||
void UnlinkMetadataFileOperation::undo()
|
||||
void UnlinkMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
|
||||
{
|
||||
/// Operations MUST be reverted in the reversed order, so
|
||||
/// when we apply operation #1 (write) and operation #2 (unlink)
|
||||
/// we should revert #2 and only after it #1. Otherwise #1 will overwrite
|
||||
/// file with incorrect data.
|
||||
if (unlink_operation)
|
||||
unlink_operation->undo();
|
||||
unlink_operation->undo(lock);
|
||||
|
||||
if (write_operation)
|
||||
write_operation->undo();
|
||||
write_operation->undo(lock);
|
||||
|
||||
/// Update outcome to reflect the fact that we have restored the file.
|
||||
outcome->num_hardlinks++;
|
||||
@ -349,10 +349,10 @@ void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_
|
||||
write_operation->execute(metadata_lock);
|
||||
}
|
||||
|
||||
void SetReadonlyFileOperation::undo()
|
||||
void SetReadonlyFileOperation::undo(std::unique_lock<SharedMutex> & lock)
|
||||
{
|
||||
if (write_operation)
|
||||
write_operation->undo();
|
||||
write_operation->undo(lock);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/SharedMutex.h>
|
||||
#include <Disks/ObjectStorages/IMetadataOperation.h>
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
|
||||
#include <numeric>
|
||||
@ -14,24 +14,13 @@ class IDisk;
|
||||
* Implementations for transactional operations with metadata used by MetadataStorageFromDisk.
|
||||
*/
|
||||
|
||||
struct IMetadataOperation
|
||||
{
|
||||
virtual void execute(std::unique_lock<SharedMutex> & metadata_lock) = 0;
|
||||
virtual void undo() = 0;
|
||||
virtual void finalize() {}
|
||||
virtual ~IMetadataOperation() = default;
|
||||
};
|
||||
|
||||
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
|
||||
|
||||
|
||||
struct SetLastModifiedOperation final : public IMetadataOperation
|
||||
{
|
||||
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_);
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -46,7 +35,7 @@ struct ChmodOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -62,7 +51,7 @@ struct UnlinkFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -77,7 +66,7 @@ struct CreateDirectoryOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -91,7 +80,7 @@ struct CreateDirectoryRecursiveOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -106,7 +95,7 @@ struct RemoveDirectoryOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -119,7 +108,7 @@ struct RemoveRecursiveOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void finalize() override;
|
||||
|
||||
@ -135,7 +124,8 @@ struct WriteFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
IDisk & disk;
|
||||
@ -154,7 +144,7 @@ struct CreateHardlinkOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path_from;
|
||||
@ -171,7 +161,7 @@ struct MoveFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path_from;
|
||||
@ -186,7 +176,7 @@ struct MoveDirectoryOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path_from;
|
||||
@ -201,7 +191,7 @@ struct ReplaceFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void finalize() override;
|
||||
|
||||
@ -229,7 +219,7 @@ struct AddBlobOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -257,7 +247,7 @@ struct UnlinkMetadataFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
@ -282,7 +272,7 @@ struct SetReadonlyFileOperation final : public IMetadataOperation
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo() override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
private:
|
||||
std::string path;
|
||||
|
@ -1,18 +1,27 @@
|
||||
#include "MetadataStorageFromPlainObjectStorage.h"
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.h>
|
||||
#include <Disks/ObjectStorages/StaticDirectoryIterator.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
#include <Common/filesystemHelpers.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <tuple>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
|
||||
ObjectStoragePtr object_storage_,
|
||||
String storage_path_prefix_)
|
||||
namespace
|
||||
{
|
||||
|
||||
std::filesystem::path normalizeDirectoryPath(const std::filesystem::path & path)
|
||||
{
|
||||
return path / "";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_)
|
||||
: object_storage(object_storage_)
|
||||
, storage_path_prefix(std::move(storage_path_prefix_))
|
||||
{
|
||||
@ -20,7 +29,7 @@ MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
|
||||
|
||||
MetadataTransactionPtr MetadataStorageFromPlainObjectStorage::createTransaction()
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromPlainObjectStorageTransaction>(*this);
|
||||
return std::make_shared<MetadataStorageFromPlainObjectStorageTransaction>(*this, object_storage);
|
||||
}
|
||||
|
||||
const std::string & MetadataStorageFromPlainObjectStorage::getPath() const
|
||||
@ -44,10 +53,9 @@ bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) con
|
||||
|
||||
bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path) const
|
||||
{
|
||||
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||
std::string directory = object_key.serialize();
|
||||
if (!directory.ends_with('/'))
|
||||
directory += '/';
|
||||
auto key_prefix = object_storage->generateObjectKeyForPath(path).serialize();
|
||||
auto directory = std::filesystem::path(std::move(key_prefix)) / "";
|
||||
|
||||
return object_storage->existsOrHasAnyChild(directory);
|
||||
}
|
||||
|
||||
@ -62,33 +70,16 @@ uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path)
|
||||
|
||||
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
|
||||
{
|
||||
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||
auto key_prefix = object_storage->generateObjectKeyForPath(path).serialize();
|
||||
|
||||
RelativePathsWithMetadata files;
|
||||
std::string abs_key = object_key.serialize();
|
||||
std::string abs_key = key_prefix;
|
||||
if (!abs_key.ends_with('/'))
|
||||
abs_key += '/';
|
||||
|
||||
object_storage->listObjects(abs_key, files, 0);
|
||||
|
||||
std::vector<std::string> result;
|
||||
for (const auto & path_size : files)
|
||||
{
|
||||
result.push_back(path_size.relative_path);
|
||||
}
|
||||
|
||||
std::unordered_set<std::string> duplicates_filter;
|
||||
for (auto & row : result)
|
||||
{
|
||||
chassert(row.starts_with(abs_key));
|
||||
row.erase(0, abs_key.size());
|
||||
auto slash_pos = row.find_first_of('/');
|
||||
if (slash_pos != std::string::npos)
|
||||
row.erase(slash_pos, row.size() - slash_pos);
|
||||
duplicates_filter.insert(row);
|
||||
}
|
||||
|
||||
return std::vector<std::string>(duplicates_filter.begin(), duplicates_filter.end());
|
||||
return getDirectChildrenOnDisk(abs_key, files, path);
|
||||
}
|
||||
|
||||
DirectoryIteratorPtr MetadataStorageFromPlainObjectStorage::iterateDirectory(const std::string & path) const
|
||||
@ -108,6 +99,25 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
|
||||
return {StoredObject(object_key.serialize(), path, object_size)};
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromPlainObjectStorage::getDirectChildrenOnDisk(
|
||||
const std::string & storage_key, const RelativePathsWithMetadata & remote_paths, const std::string & /* local_path */) const
|
||||
{
|
||||
std::unordered_set<std::string> duplicates_filter;
|
||||
for (const auto & elem : remote_paths)
|
||||
{
|
||||
const auto & path = elem.relative_path;
|
||||
chassert(path.find(storage_key) == 0);
|
||||
const auto child_pos = storage_key.size();
|
||||
/// string::npos is ok.
|
||||
const auto slash_pos = path.find('/', child_pos);
|
||||
if (slash_pos == std::string::npos)
|
||||
duplicates_filter.emplace(path.substr(child_pos));
|
||||
else
|
||||
duplicates_filter.emplace(path.substr(child_pos, slash_pos - child_pos));
|
||||
}
|
||||
return std::vector<std::string>(std::make_move_iterator(duplicates_filter.begin()), std::make_move_iterator(duplicates_filter.end()));
|
||||
}
|
||||
|
||||
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
|
||||
{
|
||||
return metadata_storage;
|
||||
@ -122,18 +132,44 @@ void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::str
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string & path)
|
||||
{
|
||||
for (auto it = metadata_storage.iterateDirectory(path); it->isValid(); it->next())
|
||||
metadata_storage.object_storage->removeObject(StoredObject(it->path()));
|
||||
if (metadata_storage.object_storage->isWriteOnce())
|
||||
{
|
||||
for (auto it = metadata_storage.iterateDirectory(path); it->isValid(); it->next())
|
||||
metadata_storage.object_storage->removeObject(StoredObject(it->path()));
|
||||
}
|
||||
else
|
||||
{
|
||||
addOperation(std::make_unique<MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation>(
|
||||
normalizeDirectoryPath(path), *metadata_storage.getPathMap(), object_storage));
|
||||
}
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectory(const std::string &)
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectory(const std::string & path)
|
||||
{
|
||||
/// Noop. It is an Object Storage not a filesystem.
|
||||
if (metadata_storage.object_storage->isWriteOnce())
|
||||
return;
|
||||
|
||||
auto normalized_path = normalizeDirectoryPath(path);
|
||||
auto key_prefix = object_storage->generateObjectKeyPrefixForDirectoryPath(normalized_path).serialize();
|
||||
auto op = std::make_unique<MetadataStorageFromPlainObjectStorageCreateDirectoryOperation>(
|
||||
std::move(normalized_path), std::move(key_prefix), *metadata_storage.getPathMap(), object_storage);
|
||||
addOperation(std::move(op));
|
||||
}
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectoryRecursive(const std::string &)
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectoryRecursive(const std::string & path)
|
||||
{
|
||||
/// Noop. It is an Object Storage not a filesystem.
|
||||
return createDirectory(path);
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
if (metadata_storage.object_storage->isWriteOnce())
|
||||
throwNotImplemented();
|
||||
|
||||
addOperation(std::make_unique<MetadataStorageFromPlainObjectStorageMoveDirectoryOperation>(
|
||||
normalizeDirectoryPath(path_from), normalizeDirectoryPath(path_to), *metadata_storage.getPathMap(), object_storage));
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
|
||||
const std::string &, ObjectStorageKey /* object_key */, uint64_t /* size_in_bytes */)
|
||||
{
|
||||
@ -146,4 +182,8 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromPlainObjectStorageTrans
|
||||
return std::make_shared<UnlinkMetadataFileOperationOutcome>(UnlinkMetadataFileOperationOutcome{0});
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::commit()
|
||||
{
|
||||
MetadataOperationsHolder::commitImpl(metadata_storage.metadata_mutex);
|
||||
}
|
||||
}
|
||||
|
@ -2,9 +2,10 @@
|
||||
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h>
|
||||
#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
|
||||
|
||||
#include <map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -23,14 +24,21 @@ using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFile
|
||||
/// It is used to allow BACKUP/RESTORE to ObjectStorage (S3/...) with the same
|
||||
/// structure as on disk MergeTree, and does not requires metadata from local
|
||||
/// disk to restore.
|
||||
class MetadataStorageFromPlainObjectStorage final : public IMetadataStorage
|
||||
class MetadataStorageFromPlainObjectStorage : public IMetadataStorage
|
||||
{
|
||||
public:
|
||||
/// Local path prefixes mapped to storage key prefixes.
|
||||
using PathMap = std::map<std::filesystem::path, std::string>;
|
||||
|
||||
private:
|
||||
friend class MetadataStorageFromPlainObjectStorageTransaction;
|
||||
|
||||
protected:
|
||||
ObjectStoragePtr object_storage;
|
||||
String storage_path_prefix;
|
||||
|
||||
mutable SharedMutex metadata_mutex;
|
||||
|
||||
public:
|
||||
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_);
|
||||
|
||||
@ -69,23 +77,37 @@ public:
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
bool supportsStat() const override { return false; }
|
||||
|
||||
protected:
|
||||
virtual std::shared_ptr<PathMap> getPathMap() const { throwNotImplemented(); }
|
||||
|
||||
virtual std::vector<std::string> getDirectChildrenOnDisk(
|
||||
const std::string & storage_key, const RelativePathsWithMetadata & remote_paths, const std::string & local_path) const;
|
||||
};
|
||||
|
||||
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction
|
||||
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction, private MetadataOperationsHolder
|
||||
{
|
||||
private:
|
||||
const MetadataStorageFromPlainObjectStorage & metadata_storage;
|
||||
MetadataStorageFromPlainObjectStorage & metadata_storage;
|
||||
ObjectStoragePtr object_storage;
|
||||
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromPlainObjectStorageTransaction(const MetadataStorageFromPlainObjectStorage & metadata_storage_)
|
||||
: metadata_storage(metadata_storage_)
|
||||
explicit MetadataStorageFromPlainObjectStorageTransaction(
|
||||
MetadataStorageFromPlainObjectStorage & metadata_storage_, ObjectStoragePtr object_storage_)
|
||||
: metadata_storage(metadata_storage_), object_storage(object_storage_)
|
||||
{}
|
||||
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes) override;
|
||||
|
||||
void setLastModified(const String &, const Poco::Timestamp &) override
|
||||
{
|
||||
/// Noop
|
||||
}
|
||||
|
||||
void createEmptyMetadataFile(const std::string & /* path */) override
|
||||
{
|
||||
/// No metadata, no need to create anything.
|
||||
@ -100,17 +122,15 @@ public:
|
||||
|
||||
void createDirectoryRecursive(const std::string & path) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void unlinkFile(const std::string & path) override;
|
||||
void removeDirectory(const std::string & path) override;
|
||||
|
||||
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
|
||||
|
||||
void commit() override
|
||||
{
|
||||
/// TODO: rewrite with transactions
|
||||
}
|
||||
void commit() override;
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,190 @@
|
||||
#include "MetadataStorageFromPlainObjectStorageOperations.h"
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int INCORRECT_DATA;
|
||||
};
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr auto PREFIX_PATH_FILE_NAME = "prefix.path";
|
||||
|
||||
}
|
||||
|
||||
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
|
||||
std::filesystem::path && path_,
|
||||
std::string && key_prefix_,
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map_,
|
||||
ObjectStoragePtr object_storage_)
|
||||
: path(std::move(path_)), key_prefix(key_prefix_), path_map(path_map_), object_storage(object_storage_)
|
||||
{
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
if (path_map.contains(path))
|
||||
return;
|
||||
|
||||
LOG_TRACE(getLogger("MetadataStorageFromPlainObjectStorageCreateDirectoryOperation"), "Creating metadata for directory '{}'", path);
|
||||
|
||||
auto object_key = ObjectStorageKey::createAsRelative(key_prefix, PREFIX_PATH_FILE_NAME);
|
||||
|
||||
auto object = StoredObject(object_key.serialize(), path / PREFIX_PATH_FILE_NAME);
|
||||
auto buf = object_storage->writeObject(
|
||||
object,
|
||||
WriteMode::Rewrite,
|
||||
/* object_attributes */ std::nullopt,
|
||||
/* buf_size */ DBMS_DEFAULT_BUFFER_SIZE,
|
||||
/* settings */ {});
|
||||
|
||||
write_created = true;
|
||||
|
||||
[[maybe_unused]] auto result = path_map.emplace(path, std::move(key_prefix));
|
||||
chassert(result.second);
|
||||
|
||||
writeString(path.string(), *buf);
|
||||
buf->finalize();
|
||||
|
||||
write_finalized = true;
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
auto object_key = ObjectStorageKey::createAsRelative(key_prefix, PREFIX_PATH_FILE_NAME);
|
||||
if (write_finalized)
|
||||
{
|
||||
path_map.erase(path);
|
||||
object_storage->removeObject(StoredObject(object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
|
||||
}
|
||||
else if (write_created)
|
||||
object_storage->removeObjectIfExists(StoredObject(object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
|
||||
}
|
||||
|
||||
MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::MetadataStorageFromPlainObjectStorageMoveDirectoryOperation(
|
||||
std::filesystem::path && path_from_,
|
||||
std::filesystem::path && path_to_,
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map_,
|
||||
ObjectStoragePtr object_storage_)
|
||||
: path_from(std::move(path_from_)), path_to(std::move(path_to_)), path_map(path_map_), object_storage(object_storage_)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::createWriteBuf(
|
||||
const std::filesystem::path & expected_path, const std::filesystem::path & new_path, bool validate_content)
|
||||
{
|
||||
auto expected_it = path_map.find(expected_path);
|
||||
if (expected_it == path_map.end())
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata object for the expected (source) path '{}' does not exist", expected_path);
|
||||
|
||||
if (path_map.contains(new_path))
|
||||
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Metadata object for the new (destination) path '{}' already exists", new_path);
|
||||
|
||||
auto object_key = ObjectStorageKey::createAsRelative(expected_it->second, PREFIX_PATH_FILE_NAME);
|
||||
|
||||
auto object = StoredObject(object_key.serialize(), expected_path / PREFIX_PATH_FILE_NAME);
|
||||
|
||||
if (validate_content)
|
||||
{
|
||||
std::string data;
|
||||
auto read_buf = object_storage->readObject(object);
|
||||
readStringUntilEOF(data, *read_buf);
|
||||
if (data != path_from)
|
||||
throw Exception(
|
||||
ErrorCodes::INCORRECT_DATA,
|
||||
"Incorrect data for object key {}, expected {}, got {}",
|
||||
object_key.serialize(),
|
||||
expected_path,
|
||||
data);
|
||||
}
|
||||
|
||||
auto write_buf = object_storage->writeObject(
|
||||
object,
|
||||
WriteMode::Rewrite,
|
||||
/* object_attributes */ std::nullopt,
|
||||
/*buf_size*/ DBMS_DEFAULT_BUFFER_SIZE,
|
||||
/*settings*/ {});
|
||||
|
||||
return write_buf;
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::execute(std::unique_lock<SharedMutex> & /* metadata_lock */)
|
||||
{
|
||||
LOG_TRACE(
|
||||
getLogger("MetadataStorageFromPlainObjectStorageMoveDirectoryOperation"), "Moving directory '{}' to '{}'", path_from, path_to);
|
||||
|
||||
auto write_buf = createWriteBuf(path_from, path_to, /* validate_content */ true);
|
||||
write_created = true;
|
||||
writeString(path_to.string(), *write_buf);
|
||||
write_buf->finalize();
|
||||
|
||||
[[maybe_unused]] auto result = path_map.emplace(path_to, path_map.extract(path_from).mapped());
|
||||
chassert(result.second);
|
||||
|
||||
write_finalized = true;
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
if (write_finalized)
|
||||
path_map.emplace(path_from, path_map.extract(path_to).mapped());
|
||||
|
||||
if (write_created)
|
||||
{
|
||||
auto write_buf = createWriteBuf(path_to, path_from, /* verify_content */ false);
|
||||
writeString(path_from.string(), *write_buf);
|
||||
write_buf->finalize();
|
||||
}
|
||||
}
|
||||
|
||||
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
|
||||
std::filesystem::path && path_, MetadataStorageFromPlainObjectStorage::PathMap & path_map_, ObjectStoragePtr object_storage_)
|
||||
: path(std::move(path_)), path_map(path_map_), object_storage(object_storage_)
|
||||
{
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std::unique_lock<SharedMutex> & /* metadata_lock */)
|
||||
{
|
||||
auto path_it = path_map.find(path);
|
||||
if (path_it == path_map.end())
|
||||
return;
|
||||
|
||||
LOG_TRACE(getLogger("MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation"), "Removing directory '{}'", path);
|
||||
|
||||
key_prefix = path_it->second;
|
||||
auto object_key = ObjectStorageKey::createAsRelative(key_prefix, PREFIX_PATH_FILE_NAME);
|
||||
auto object = StoredObject(object_key.serialize(), path / PREFIX_PATH_FILE_NAME);
|
||||
object_storage->removeObject(object);
|
||||
path_map.erase(path_it);
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
|
||||
{
|
||||
if (!removed)
|
||||
return;
|
||||
|
||||
auto object_key = ObjectStorageKey::createAsRelative(key_prefix, PREFIX_PATH_FILE_NAME);
|
||||
auto object = StoredObject(object_key.serialize(), path / PREFIX_PATH_FILE_NAME);
|
||||
auto buf = object_storage->writeObject(
|
||||
object,
|
||||
WriteMode::Rewrite,
|
||||
/* object_attributes */ std::nullopt,
|
||||
/* buf_size */ DBMS_DEFAULT_BUFFER_SIZE,
|
||||
/* settings */ {});
|
||||
writeString(path.string(), *buf);
|
||||
buf->finalize();
|
||||
|
||||
path_map.emplace(path, std::move(key_prefix));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/ObjectStorages/IMetadataOperation.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MetadataStorageFromPlainObjectStorageCreateDirectoryOperation final : public IMetadataOperation
|
||||
{
|
||||
private:
|
||||
std::filesystem::path path;
|
||||
std::string key_prefix;
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map;
|
||||
ObjectStoragePtr object_storage;
|
||||
|
||||
bool write_created = false;
|
||||
bool write_finalized = false;
|
||||
|
||||
public:
|
||||
// Assuming that paths are normalized.
|
||||
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
|
||||
std::filesystem::path && path_,
|
||||
std::string && key_prefix_,
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map_,
|
||||
ObjectStoragePtr object_storage_);
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
};
|
||||
|
||||
class MetadataStorageFromPlainObjectStorageMoveDirectoryOperation final : public IMetadataOperation
|
||||
{
|
||||
private:
|
||||
std::filesystem::path path_from;
|
||||
std::filesystem::path path_to;
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map;
|
||||
ObjectStoragePtr object_storage;
|
||||
|
||||
bool write_created = false;
|
||||
bool write_finalized = false;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
createWriteBuf(const std::filesystem::path & expected_path, const std::filesystem::path & new_path, bool validate_content);
|
||||
|
||||
public:
|
||||
MetadataStorageFromPlainObjectStorageMoveDirectoryOperation(
|
||||
std::filesystem::path && path_from_,
|
||||
std::filesystem::path && path_to_,
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map_,
|
||||
ObjectStoragePtr object_storage_);
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
};
|
||||
|
||||
class MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation final : public IMetadataOperation
|
||||
{
|
||||
private:
|
||||
std::filesystem::path path;
|
||||
|
||||
MetadataStorageFromPlainObjectStorage::PathMap & path_map;
|
||||
ObjectStoragePtr object_storage;
|
||||
|
||||
std::string key_prefix;
|
||||
bool removed = false;
|
||||
|
||||
public:
|
||||
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
|
||||
std::filesystem::path && path_, MetadataStorageFromPlainObjectStorage::PathMap & path_map_, ObjectStoragePtr object_storage_);
|
||||
|
||||
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
void undo(std::unique_lock<SharedMutex> & metadata_lock) override;
|
||||
};
|
||||
|
||||
}
|
@ -0,0 +1,143 @@
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainRewritableObjectStorage.h>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "CommonPathPrefixKeyGenerator.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr auto PREFIX_PATH_FILE_NAME = "prefix.path";
|
||||
|
||||
MetadataStorageFromPlainObjectStorage::PathMap loadPathPrefixMap(const std::string & root, ObjectStoragePtr object_storage)
|
||||
{
|
||||
MetadataStorageFromPlainObjectStorage::PathMap result;
|
||||
|
||||
RelativePathsWithMetadata files;
|
||||
object_storage->listObjects(root, files, 0);
|
||||
for (const auto & file : files)
|
||||
{
|
||||
auto remote_path = std::filesystem::path(file.relative_path);
|
||||
if (remote_path.filename() != PREFIX_PATH_FILE_NAME)
|
||||
continue;
|
||||
|
||||
StoredObject object{file.relative_path};
|
||||
|
||||
auto read_buf = object_storage->readObject(object);
|
||||
String local_path;
|
||||
readStringUntilEOF(local_path, *read_buf);
|
||||
|
||||
chassert(remote_path.has_parent_path());
|
||||
auto res = result.emplace(local_path, remote_path.parent_path());
|
||||
|
||||
/// This can happen if table replication is enabled, then the same local path is written
|
||||
/// in `prefix.path` of each replica.
|
||||
/// TODO: should replicated tables (e.g., RMT) be explicitly disallowed?
|
||||
if (!res.second)
|
||||
LOG_WARNING(
|
||||
getLogger("MetadataStorageFromPlainObjectStorage"),
|
||||
"The local path '{}' is already mapped to a remote path '{}', ignoring: '{}'",
|
||||
local_path,
|
||||
res.first->second,
|
||||
remote_path.parent_path().string());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::string> getDirectChildrenOnRewritableDisk(
|
||||
const std::string & storage_key,
|
||||
const RelativePathsWithMetadata & remote_paths,
|
||||
const std::string & local_path,
|
||||
const MetadataStorageFromPlainObjectStorage::PathMap & local_path_prefixes,
|
||||
SharedMutex & shared_mutex)
|
||||
{
|
||||
using PathMap = MetadataStorageFromPlainObjectStorage::PathMap;
|
||||
|
||||
std::unordered_set<std::string> duplicates_filter;
|
||||
|
||||
/// Map remote paths into local subdirectories.
|
||||
std::unordered_map<PathMap::mapped_type, PathMap::key_type> remote_to_local_subdir;
|
||||
|
||||
{
|
||||
std::shared_lock lock(shared_mutex);
|
||||
auto end_it = local_path_prefixes.end();
|
||||
for (auto it = local_path_prefixes.lower_bound(local_path); it != end_it; ++it)
|
||||
{
|
||||
const auto & [k, v] = std::make_tuple(it->first.string(), it->second);
|
||||
if (!k.starts_with(local_path))
|
||||
break;
|
||||
|
||||
auto slash_num = count(k.begin() + local_path.size(), k.end(), '/');
|
||||
if (slash_num != 1)
|
||||
continue;
|
||||
|
||||
chassert(k.back() == '/');
|
||||
remote_to_local_subdir.emplace(v, std::string(k.begin() + local_path.size(), k.end() - 1));
|
||||
}
|
||||
}
|
||||
|
||||
auto skip_list = std::set<std::string>{PREFIX_PATH_FILE_NAME};
|
||||
for (const auto & elem : remote_paths)
|
||||
{
|
||||
const auto & path = elem.relative_path;
|
||||
chassert(path.find(storage_key) == 0);
|
||||
const auto child_pos = storage_key.size();
|
||||
|
||||
auto slash_pos = path.find('/', child_pos);
|
||||
|
||||
if (slash_pos == std::string::npos)
|
||||
{
|
||||
/// File names.
|
||||
auto filename = path.substr(child_pos);
|
||||
if (!skip_list.contains(filename))
|
||||
duplicates_filter.emplace(std::move(filename));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Subdirectories.
|
||||
auto it = remote_to_local_subdir.find(path.substr(0, slash_pos));
|
||||
/// Mapped subdirectories.
|
||||
if (it != remote_to_local_subdir.end())
|
||||
duplicates_filter.emplace(it->second);
|
||||
/// The remote subdirectory name is the same as the local subdirectory.
|
||||
else
|
||||
duplicates_filter.emplace(path.substr(child_pos, slash_pos - child_pos));
|
||||
}
|
||||
}
|
||||
|
||||
return std::vector<std::string>(std::make_move_iterator(duplicates_filter.begin()), std::make_move_iterator(duplicates_filter.end()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
MetadataStorageFromPlainRewritableObjectStorage::MetadataStorageFromPlainRewritableObjectStorage(
|
||||
ObjectStoragePtr object_storage_, String storage_path_prefix_)
|
||||
: MetadataStorageFromPlainObjectStorage(object_storage_, storage_path_prefix_)
|
||||
, path_map(std::make_shared<PathMap>(loadPathPrefixMap(object_storage->getCommonKeyPrefix(), object_storage)))
|
||||
{
|
||||
if (object_storage->isWriteOnce())
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"MetadataStorageFromPlainRewritableObjectStorage is not compatible with write-once storage '{}'",
|
||||
object_storage->getName());
|
||||
|
||||
auto keys_gen = std::make_shared<CommonPathPrefixKeyGenerator>(object_storage->getCommonKeyPrefix(), metadata_mutex, path_map);
|
||||
object_storage->setKeysGenerator(keys_gen);
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromPlainRewritableObjectStorage::getDirectChildrenOnDisk(
|
||||
const std::string & storage_key, const RelativePathsWithMetadata & remote_paths, const std::string & local_path) const
|
||||
{
|
||||
return getDirectChildrenOnRewritableDisk(storage_key, remote_paths, local_path, *getPathMap(), metadata_mutex);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MetadataStorageFromPlainRewritableObjectStorage final : public MetadataStorageFromPlainObjectStorage
|
||||
{
|
||||
private:
|
||||
std::shared_ptr<PathMap> path_map;
|
||||
|
||||
public:
|
||||
MetadataStorageFromPlainRewritableObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_);
|
||||
|
||||
MetadataStorageType getType() const override { return MetadataStorageType::PlainRewritable; }
|
||||
|
||||
protected:
|
||||
std::shared_ptr<PathMap> getPathMap() const override { return path_map; }
|
||||
std::vector<std::string> getDirectChildrenOnDisk(
|
||||
const std::string & storage_key, const RelativePathsWithMetadata & remote_paths, const std::string & local_path) const override;
|
||||
};
|
||||
|
||||
}
|
23
src/Disks/ObjectStorages/MetadataStorageTransactionState.cpp
Normal file
23
src/Disks/ObjectStorages/MetadataStorageTransactionState.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string toString(MetadataStorageTransactionState state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case MetadataStorageTransactionState::PREPARING:
|
||||
return "PREPARING";
|
||||
case MetadataStorageTransactionState::FAILED:
|
||||
return "FAILED";
|
||||
case MetadataStorageTransactionState::COMMITTED:
|
||||
return "COMMITTED";
|
||||
case MetadataStorageTransactionState::PARTIALLY_ROLLED_BACK:
|
||||
return "PARTIALLY_ROLLED_BACK";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
}
|
@ -4,7 +4,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class MetadataFromDiskTransactionState
|
||||
enum class MetadataStorageTransactionState
|
||||
{
|
||||
PREPARING,
|
||||
FAILED,
|
||||
@ -12,6 +12,5 @@ enum class MetadataFromDiskTransactionState
|
||||
PARTIALLY_ROLLED_BACK,
|
||||
};
|
||||
|
||||
std::string toString(MetadataFromDiskTransactionState state);
|
||||
|
||||
std::string toString(MetadataStorageTransactionState state);
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
#include "config.h"
|
||||
#include <utility>
|
||||
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
|
||||
#include "Disks/DiskType.h"
|
||||
#include "config.h"
|
||||
#if USE_AWS_S3
|
||||
#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
||||
#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
|
||||
#endif
|
||||
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
|
||||
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
|
||||
@ -20,6 +22,7 @@
|
||||
#endif
|
||||
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
|
||||
#include <Disks/ObjectStorages/PlainObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/PlainRewritableObjectStorage.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/Macros.h>
|
||||
|
||||
@ -35,36 +38,50 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
bool isPlainStorage(
|
||||
ObjectStorageType type,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix)
|
||||
{
|
||||
auto compatibility_hint = MetadataStorageFactory::getCompatibilityMetadataTypeHint(type);
|
||||
auto metadata_type = MetadataStorageFactory::getMetadataType(config, config_prefix, compatibility_hint);
|
||||
return metadataTypeFromString(metadata_type) == MetadataStorageType::Plain;
|
||||
}
|
||||
|
||||
template <typename BaseObjectStorage, class ...Args>
|
||||
ObjectStoragePtr createObjectStorage(
|
||||
ObjectStorageType type,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
Args && ...args)
|
||||
bool isCompatibleWithMetadataStorage(
|
||||
ObjectStorageType storage_type,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
MetadataStorageType target_metadata_type)
|
||||
{
|
||||
auto compatibility_hint = MetadataStorageFactory::getCompatibilityMetadataTypeHint(storage_type);
|
||||
auto metadata_type = MetadataStorageFactory::getMetadataType(config, config_prefix, compatibility_hint);
|
||||
return metadataTypeFromString(metadata_type) == target_metadata_type;
|
||||
}
|
||||
|
||||
bool isPlainStorage(ObjectStorageType type, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||
{
|
||||
return isCompatibleWithMetadataStorage(type, config, config_prefix, MetadataStorageType::Plain);
|
||||
}
|
||||
|
||||
bool isPlainRewritableStorage(ObjectStorageType type, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||
{
|
||||
return isCompatibleWithMetadataStorage(type, config, config_prefix, MetadataStorageType::PlainRewritable);
|
||||
}
|
||||
|
||||
template <typename BaseObjectStorage, class... Args>
|
||||
ObjectStoragePtr createObjectStorage(
|
||||
ObjectStorageType type, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Args &&... args)
|
||||
{
|
||||
if (isPlainStorage(type, config, config_prefix))
|
||||
return std::make_shared<PlainObjectStorage<BaseObjectStorage>>(std::forward<Args>(args)...);
|
||||
else if (isPlainRewritableStorage(type, config, config_prefix))
|
||||
{
|
||||
if (isPlainStorage(type, config, config_prefix))
|
||||
{
|
||||
return std::make_shared<PlainObjectStorage<BaseObjectStorage>>(std::forward<Args>(args)...);
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::make_shared<BaseObjectStorage>(std::forward<Args>(args)...);
|
||||
}
|
||||
/// TODO(jkartseva@): Test support for generic disk type
|
||||
if (type != ObjectStorageType::S3)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "plain_rewritable metadata storage support is implemented only for S3");
|
||||
|
||||
return std::make_shared<PlainRewritableObjectStorage<BaseObjectStorage>>(std::forward<Args>(args)...);
|
||||
}
|
||||
else
|
||||
return std::make_shared<BaseObjectStorage>(std::forward<Args>(args)...);
|
||||
}
|
||||
}
|
||||
|
||||
ObjectStorageFactory & ObjectStorageFactory::instance()
|
||||
@ -76,10 +93,7 @@ ObjectStorageFactory & ObjectStorageFactory::instance()
|
||||
void ObjectStorageFactory::registerObjectStorageType(const std::string & type, Creator creator)
|
||||
{
|
||||
if (!registry.emplace(type, creator).second)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"ObjectStorageFactory: the metadata type '{}' is not unique", type);
|
||||
}
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "ObjectStorageFactory: the metadata type '{}' is not unique", type);
|
||||
}
|
||||
|
||||
ObjectStoragePtr ObjectStorageFactory::create(
|
||||
@ -91,13 +105,9 @@ ObjectStoragePtr ObjectStorageFactory::create(
|
||||
{
|
||||
std::string type;
|
||||
if (config.has(config_prefix + ".object_storage_type"))
|
||||
{
|
||||
type = config.getString(config_prefix + ".object_storage_type");
|
||||
}
|
||||
else if (config.has(config_prefix + ".type")) /// .type -- for compatibility.
|
||||
{
|
||||
type = config.getString(config_prefix + ".type");
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `object_storage_type` in config");
|
||||
@ -210,31 +220,66 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
return object_storage;
|
||||
});
|
||||
}
|
||||
|
||||
void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
static constexpr auto disk_type = "s3_plain_rewritable";
|
||||
|
||||
factory.registerObjectStorageType(
|
||||
disk_type,
|
||||
[](const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool skip_access_check) -> ObjectStoragePtr
|
||||
{
|
||||
/// send_metadata changes the filenames (includes revision), while
|
||||
/// s3_plain_rewritable does not support file renaming.
|
||||
if (config.getBool(config_prefix + ".send_metadata", false))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain_rewritable does not supports send_metadata");
|
||||
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context);
|
||||
auto client = getClient(config, config_prefix, context, *settings);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
auto object_storage = std::make_shared<PlainRewritableObjectStorage<S3ObjectStorage>>(
|
||||
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
|
||||
/// NOTE: should we still perform this check for clickhouse-disks?
|
||||
if (!skip_access_check)
|
||||
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
|
||||
|
||||
return object_storage;
|
||||
});
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
|
||||
void registerHDFSObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
factory.registerObjectStorageType("hdfs", [](
|
||||
const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
|
||||
checkHDFSURL(uri);
|
||||
if (uri.back() != '/')
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
|
||||
factory.registerObjectStorageType(
|
||||
"hdfs",
|
||||
[](const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
|
||||
checkHDFSURL(uri);
|
||||
if (uri.back() != '/')
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
|
||||
|
||||
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
|
||||
context->getSettingsRef().hdfs_replication
|
||||
);
|
||||
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
|
||||
context->getSettingsRef().hdfs_replication);
|
||||
|
||||
return createObjectStorage<HDFSObjectStorage>(ObjectStorageType::HDFS, config, config_prefix, uri, std::move(settings), config);
|
||||
});
|
||||
return createObjectStorage<HDFSObjectStorage>(ObjectStorageType::HDFS, config, config_prefix, uri, std::move(settings), config);
|
||||
});
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -317,6 +362,7 @@ void registerObjectStorages()
|
||||
#if USE_AWS_S3
|
||||
registerS3ObjectStorage(factory);
|
||||
registerS3PlainObjectStorage(factory);
|
||||
registerS3PlainRewritableObjectStorage(factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
|
||||
|
24
src/Disks/ObjectStorages/PlainRewritableObjectStorage.h
Normal file
24
src/Disks/ObjectStorages/PlainRewritableObjectStorage.h
Normal file
@ -0,0 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename BaseObjectStorage>
|
||||
class PlainRewritableObjectStorage : public BaseObjectStorage
|
||||
{
|
||||
public:
|
||||
template <class... Args>
|
||||
explicit PlainRewritableObjectStorage(Args &&... args) : BaseObjectStorage(std::forward<Args>(args)...)
|
||||
{
|
||||
}
|
||||
|
||||
std::string getName() const override { return "PlainRewritable" + BaseObjectStorage::getName(); }
|
||||
|
||||
bool isWriteOnce() const override { return false; }
|
||||
|
||||
bool isPlain() const override { return true; }
|
||||
};
|
||||
|
||||
}
|
@ -29,7 +29,10 @@ void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_ch
|
||||
if (!config.has(config_prefix + ".metadata_type"))
|
||||
{
|
||||
if (object_storage->isPlain())
|
||||
compatibility_metadata_type_hint = "plain";
|
||||
if (object_storage->isWriteOnce())
|
||||
compatibility_metadata_type_hint = "plain";
|
||||
else
|
||||
compatibility_metadata_type_hint = "plain_rewritable";
|
||||
else
|
||||
compatibility_metadata_type_hint = MetadataStorageFactory::getCompatibilityMetadataTypeHint(object_storage->getType());
|
||||
}
|
||||
@ -53,6 +56,7 @@ void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_ch
|
||||
#if USE_AWS_S3
|
||||
factory.registerDiskType("s3", creator); /// For compatibility
|
||||
factory.registerDiskType("s3_plain", creator); /// For compatibility
|
||||
factory.registerDiskType("s3_plain_rewritable", creator); // For compatibility
|
||||
#endif
|
||||
#if USE_HDFS
|
||||
factory.registerDiskType("hdfs", creator); /// For compatibility
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include "Common/ObjectStorageKey.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
@ -568,10 +569,17 @@ ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string & p
|
||||
{
|
||||
if (!key_generator)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Key generator is not set");
|
||||
return key_generator->generate(path);
|
||||
|
||||
return key_generator->generate(path, /* is_directory */ false);
|
||||
}
|
||||
|
||||
ObjectStorageKey S3ObjectStorage::generateObjectKeyPrefixForDirectoryPath(const std::string & path) const
|
||||
{
|
||||
if (!key_generator)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Key generator is not set");
|
||||
|
||||
return key_generator->generate(path, /* is_directory */ true);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -43,8 +43,6 @@ struct S3ObjectStorageSettings
|
||||
class S3ObjectStorage : public IObjectStorage
|
||||
{
|
||||
private:
|
||||
friend class S3PlainObjectStorage;
|
||||
|
||||
S3ObjectStorage(
|
||||
const char * logger_name,
|
||||
std::unique_ptr<S3::Client> && client_,
|
||||
@ -54,11 +52,11 @@ private:
|
||||
ObjectStorageKeysGeneratorPtr key_generator_,
|
||||
const String & disk_name_)
|
||||
: uri(uri_)
|
||||
, key_generator(std::move(key_generator_))
|
||||
, disk_name(disk_name_)
|
||||
, client(std::move(client_))
|
||||
, s3_settings(std::move(s3_settings_))
|
||||
, s3_capabilities(s3_capabilities_)
|
||||
, key_generator(std::move(key_generator_))
|
||||
, log(getLogger(logger_name))
|
||||
{
|
||||
}
|
||||
@ -161,9 +159,12 @@ public:
|
||||
bool supportParallelWrite() const override { return true; }
|
||||
|
||||
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||
ObjectStorageKey generateObjectKeyPrefixForDirectoryPath(const std::string & path) const override;
|
||||
|
||||
bool isReadOnly() const override { return s3_settings.get()->read_only; }
|
||||
|
||||
void setKeysGenerator(ObjectStorageKeysGeneratorPtr gen) override { key_generator = gen; }
|
||||
|
||||
private:
|
||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||
|
||||
@ -172,13 +173,14 @@ private:
|
||||
|
||||
const S3::URI uri;
|
||||
|
||||
ObjectStorageKeysGeneratorPtr key_generator;
|
||||
std::string disk_name;
|
||||
|
||||
MultiVersion<S3::Client> client;
|
||||
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
||||
S3Capabilities s3_capabilities;
|
||||
|
||||
ObjectStorageKeysGeneratorPtr key_generator;
|
||||
|
||||
LoggerPtr log;
|
||||
};
|
||||
|
||||
|
@ -1,9 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
|
||||
#include <Disks/ObjectStorages/Web/WebObjectStorage.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
|
||||
#include <Disks/ObjectStorages/Web/WebObjectStorage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -2964,6 +2964,10 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Experimental Inverted Index feature is not enabled (turn on setting 'allow_experimental_inverted_index')");
|
||||
|
||||
for (const auto & disk : getDisks())
|
||||
if (!disk->supportsHardLinks())
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ALTER TABLE is not supported for immutable disk '{}'", disk->getName());
|
||||
|
||||
/// Set of columns that shouldn't be altered.
|
||||
NameSet columns_alter_type_forbidden;
|
||||
|
||||
@ -3334,7 +3338,9 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
|
||||
|
||||
void MergeTreeData::checkMutationIsPossible(const MutationCommands & /*commands*/, const Settings & /*settings*/) const
|
||||
{
|
||||
/// Some validation will be added
|
||||
for (const auto & disk : getDisks())
|
||||
if (!disk->supportsHardLinks())
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Mutations are not supported for immutable disk '{}'", disk->getName());
|
||||
}
|
||||
|
||||
MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const
|
||||
@ -4824,6 +4830,11 @@ void MergeTreeData::removePartContributionToColumnAndSecondaryIndexSizes(const D
|
||||
void MergeTreeData::checkAlterPartitionIsPossible(
|
||||
const PartitionCommands & commands, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & settings, ContextPtr local_context) const
|
||||
{
|
||||
for (const auto & disk : getDisks())
|
||||
if (!disk->supportsHardLinks())
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED, "ALTER TABLE PARTITION is not supported for immutable disk '{}'", disk->getName());
|
||||
|
||||
for (const auto & command : commands)
|
||||
{
|
||||
if (command.type == PartitionCommand::DROP_DETACHED_PARTITION
|
||||
|
@ -0,0 +1,21 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk_s3_plain_rewritable>
|
||||
<type>s3_plain_rewritable</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</disk_s3_plain_rewritable>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3_plain_rewritable>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_s3_plain_rewritable</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_plain_rewritable>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
81
tests/integration/test_s3_plain_rewritable/test.py
Normal file
81
tests/integration/test_s3_plain_rewritable/test.py
Normal file
@ -0,0 +1,81 @@
|
||||
import pytest
|
||||
import random
|
||||
import string
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/storage_conf.xml"],
|
||||
with_minio=True,
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
insert_values = [
|
||||
"(0,'data'),(1,'data')",
|
||||
",".join(
|
||||
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')"
|
||||
for i in range(10)
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.order(0)
|
||||
def test_insert():
|
||||
for index, value in enumerate(insert_values):
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE test_{} (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS storage_policy='s3_plain_rewritable'
|
||||
""".format(
|
||||
index
|
||||
)
|
||||
)
|
||||
|
||||
node.query("INSERT INTO test_{} VALUES {}".format(index, value))
|
||||
assert (
|
||||
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
|
||||
== value
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.order(1)
|
||||
def test_restart():
|
||||
for index, value in enumerate(insert_values):
|
||||
assert (
|
||||
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
|
||||
== value
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
|
||||
for index, value in enumerate(insert_values):
|
||||
assert (
|
||||
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
|
||||
== value
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.order(2)
|
||||
def test_drop():
|
||||
for index, value in enumerate(insert_values):
|
||||
node.query("DROP TABLE IF EXISTS test_{} SYNC".format(index))
|
||||
|
||||
it = cluster.minio_client.list_objects(
|
||||
cluster.minio_bucket, "data/", recursive=True
|
||||
)
|
||||
|
||||
assert len(list(it)) == 0
|
@ -7,6 +7,12 @@
|
||||
<disk2>
|
||||
<path>/var/lib/clickhouse2/</path>
|
||||
</disk2>
|
||||
<disk3>
|
||||
<type>s3_plain_rewritable</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</disk3>
|
||||
</disks>
|
||||
<policies>
|
||||
<policy1>
|
||||
@ -23,6 +29,13 @@
|
||||
</volume1>
|
||||
</volumes>
|
||||
</policy2>
|
||||
<s3_plain_rewritable>
|
||||
<volumes>
|
||||
<volume1>
|
||||
<disk>disk3</disk>
|
||||
</volume1>
|
||||
</volumes>
|
||||
</s3_plain_rewritable>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
</clickhouse>
|
||||
|
@ -0,0 +1,6 @@
|
||||
|
||||
<clickhouse>
|
||||
<query_log>
|
||||
<engine>Engine = MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + INTERVAL 30 day SETTINGS storage_policy='s3_plain_rewritable', ttl_only_drop_parts=1</engine>
|
||||
</query_log>
|
||||
</clickhouse>
|
@ -35,6 +35,18 @@ node3 = cluster.add_instance(
|
||||
)
|
||||
|
||||
|
||||
node4 = cluster.add_instance(
|
||||
"node4",
|
||||
base_config_dir="configs",
|
||||
main_configs=[
|
||||
"configs/config.d/system_logs_engine_s3_plain_rewritable_policy.xml",
|
||||
"configs/config.d/disks.xml",
|
||||
],
|
||||
with_minio=True,
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
@ -78,6 +90,22 @@ def test_system_logs_engine_expr(start_cluster):
|
||||
)
|
||||
|
||||
|
||||
def test_system_logs_engine_s3_plain_rw_expr(start_cluster):
|
||||
node4.query("SET log_query_threads = 1")
|
||||
node4.query("SELECT count() FROM system.tables")
|
||||
node4.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
# Check 'engine_full' of system.query_log.
|
||||
expected = "MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + toIntervalDay(30) SETTINGS storage_policy = \\'s3_plain_rewritable\\', ttl_only_drop_parts = 1"
|
||||
assert expected in node4.query(
|
||||
"SELECT engine_full FROM system.tables WHERE database='system' and name='query_log'"
|
||||
)
|
||||
node4.restart_clickhouse()
|
||||
assert expected in node4.query(
|
||||
"SELECT engine_full FROM system.tables WHERE database='system' and name='query_log'"
|
||||
)
|
||||
|
||||
|
||||
def test_system_logs_settings_expr(start_cluster):
|
||||
node3.query("SET log_query_threads = 1")
|
||||
node3.query("SELECT count() FROM system.tables")
|
||||
|
@ -0,0 +1,11 @@
|
||||
10006
|
||||
0 0 0
|
||||
1 1 1
|
||||
1 2 0
|
||||
2 2 2
|
||||
2 2 2
|
||||
3 1 9
|
||||
3 3 3
|
||||
4 4 4
|
||||
4 7 7
|
||||
5 5 5
|
35
tests/queries/0_stateless/03008_s3_plain_rewritable.sql
Normal file
35
tests/queries/0_stateless/03008_s3_plain_rewritable.sql
Normal file
@ -0,0 +1,35 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag: no-fasttest -- requires S3
|
||||
|
||||
drop table if exists test_mt;
|
||||
create table test_mt (a Int32, b Int64, c Int64) engine = MergeTree() partition by intDiv(a, 1000) order by tuple(a, b)
|
||||
settings disk = disk(
|
||||
name = s3_plain_rewritable,
|
||||
type = s3_plain_rewritable,
|
||||
endpoint = 'http://localhost:11111/test/test_mt/',
|
||||
access_key_id = clickhouse,
|
||||
secret_access_key = clickhouse);
|
||||
|
||||
insert into test_mt (*) values (1, 2, 0), (2, 2, 2), (3, 1, 9), (4, 7, 7), (5, 10, 2), (6, 12, 5);
|
||||
insert into test_mt (*) select number, number, number from numbers_mt(10000);
|
||||
|
||||
select count(*) from test_mt;
|
||||
select (*) from test_mt order by tuple(a, b) limit 10;
|
||||
|
||||
optimize table test_mt final;
|
||||
|
||||
alter table test_mt add projection test_mt_projection (
|
||||
select * order by b); -- { serverError SUPPORT_IS_DISABLED }
|
||||
|
||||
alter table test_mt update c = 0 where a % 2 = 1; -- { serverError SUPPORT_IS_DISABLED }
|
||||
alter table test_mt add column d Int64 after c; -- { serverError SUPPORT_IS_DISABLED }
|
||||
alter table test_mt drop column c; -- { serverError SUPPORT_IS_DISABLED }
|
||||
|
||||
detach table test_mt;
|
||||
attach table test_mt;
|
||||
|
||||
drop table if exists test_mt_dst;
|
||||
|
||||
create table test_mt_dst (a Int32, b Int64, c Int64) engine = MergeTree() partition by intDiv(a, 1000) order by tuple(a, b)
|
||||
settings disk = 's3_plain_rewritable';
|
||||
alter table test_mt move partition 0 to table test_mt_dst; -- { serverError SUPPORT_IS_DISABLED }
|
@ -2321,6 +2321,7 @@ retentions
|
||||
rethrow
|
||||
retransmit
|
||||
retriable
|
||||
rewritable
|
||||
reverseUTF
|
||||
rightPad
|
||||
rightPadUTF
|
||||
|
Loading…
Reference in New Issue
Block a user