Merging DiskS3

This commit is contained in:
Alexey Milovidov 2020-01-19 02:18:23 +03:00
parent 4da00994d1
commit 2d39e5ad8c
10 changed files with 119 additions and 73 deletions

View File

@ -159,9 +159,14 @@ std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path, size_t bu
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags);
}
void DiskLocal::remove(const String & path, bool recursive)
void DiskLocal::remove(const String & path)
{
Poco::File(disk_path + path).remove(recursive);
Poco::File(disk_path + path).remove(false);
}
void DiskLocal::removeRecursive(const String & path)
{
Poco::File(disk_path + path).remove(true);
}
void DiskLocalReservation::update(UInt64 new_size)

View File

@ -70,7 +70,9 @@ public:
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;
void remove(const String & path, bool recursive) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
bool tryReserve(UInt64 bytes);

View File

@ -229,7 +229,7 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
return std::make_unique<WriteBufferFromString>(iter->second.data);
}
void DiskMemory::remove(const String & path, bool recursive)
void DiskMemory::remove(const String & path)
{
std::lock_guard lock(mutex);
@ -237,24 +237,32 @@ void DiskMemory::remove(const String & path, bool recursive)
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
if (file_it->second.type == FileType::File)
if (file_it->second.type == FileType::Directory)
{
files.erase(file_it);
return;
if (std::any_of(files.begin(), files.end(), [path](const auto & file) { return parentPath(file.first) == path; }))
throw Exception("Directory '" + path + "' is not empty", ErrorCodes::CANNOT_DELETE_DIRECTORY);
}
if (!recursive && std::any_of(files.begin(), files.end(), [path](const auto & file) { return parentPath(file.first) == path; }))
throw Exception("Directory '" + path + "' is not empty", ErrorCodes::CANNOT_DELETE_DIRECTORY);
if (recursive)
else
{
for (auto iter = files.begin(); iter != files.end();)
{
if (iter->first.size() >= path.size() && std::string_view(iter->first.data(), path.size()) == path)
iter = files.erase(iter);
else
++iter;
}
files.erase(file_it);
}
}
void DiskMemory::removeRecursive(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
if (iter->first.size() >= path.size() && std::string_view(iter->first.data(), path.size()) == path)
iter = files.erase(iter);
else
++iter;
}
}

View File

@ -67,7 +67,9 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite) override;
void remove(const String & path, bool recursive) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
void createDirectoriesImpl(const String & path);

View File

@ -3,14 +3,19 @@
#if USE_AWS_S3
# include "DiskFactory.h"
# include <IO/ReadBufferFromS3.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromS3.h>
# include <Poco/File.h>
# include <Poco/FileStream.h>
# include <Common/quoteString.h>
# include <random>
# include <IO/S3Common.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/WriteBufferFromS3.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Poco/File.h>
# include <Common/checkStackSize.h>
# include <Common/quoteString.h>
# include <Common/thread_local_rng.h>
# include <aws/s3/model/CopyObjectRequest.h>
# include <aws/s3/model/DeleteObjectRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
@ -19,26 +24,39 @@ namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_DELETE_DIRECTORY;
extern const int FILE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
extern const int PATH_ACCESS_DENIED;
extern const int S3_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
namespace
{
template <typename R, typename E>
void throwIfError(Aws::Utils::Outcome<R, E> && response)
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> && response)
{
if (!response.IsSuccess())
{
auto & err = response.GetError();
const auto & err = response.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
}
String readKeyFromFile(const String & path)
{
String key;
ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */
readStringUntilEOF(key, buf);
return key;
}
void writeKeyToFile(const String & key, const String & path)
{
WriteBufferFromFile buf(path, 1024);
writeString(key, buf);
buf.next();
}
/// Stores data in S3 and the object key in file in local filesystem.
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
{
public:
@ -57,17 +75,15 @@ namespace
void finalize() override
{
WriteBufferFromS3::finalize();
Poco::FileOutputStream(metadata_path) << s3_path;
writeKeyToFile(metadata_path, s3_path);
finalized = true;
}
~WriteIndirectBufferFromS3() override
{
if (!finalized)
{
WriteBufferFromS3::finalize();
Poco::FileOutputStream(metadata_path) << s3_path;
}
if (finalized)
return;
finalize();
}
private:
@ -140,6 +156,13 @@ DiskDirectoryIteratorPtr DiskS3::iterateDirectory(const String & path)
return std::make_unique<DiskS3DirectoryIterator>(metadata_path + path, path);
}
void DiskS3::clearDirectory(const String & path)
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (isFile(it->path()))
remove(it->path());
}
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
@ -156,7 +179,7 @@ void DiskS3::replaceFile(const String & from_path, const String & to_path)
Poco::File tmp_file(metadata_path + to_path + ".old");
to_file.renameTo(tmp_file.path());
from_file.renameTo(metadata_path + to_path);
remove(to_path + ".old", false);
remove(to_path + ".old");
}
else
from_file.renameTo(to_file.path());
@ -165,18 +188,17 @@ void DiskS3::replaceFile(const String & from_path, const String & to_path)
void DiskS3::copyFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
remove(to_path, false);
remove(to_path);
String s3_from_path;
String s3_from_path = readKeyFromFile(metadata_path + from_path);
String s3_to_path = s3_root_path + getRandomName();
Poco::FileInputStream(metadata_path + from_path) >> s3_from_path;
Aws::S3::Model::CopyObjectRequest req;
req.SetBucket(bucket);
req.SetCopySource(s3_from_path);
req.SetKey(s3_to_path);
throwIfError(client->CopyObject(req));
Poco::FileOutputStream(metadata_path + to_path) << s3_to_path;
writeKeyToFile(s3_to_path, metadata_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
@ -203,7 +225,7 @@ std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_s
}
}
void DiskS3::remove(const String & path, bool recursive)
void DiskS3::remove(const String & path)
{
Poco::File file(metadata_path + path);
if (file.isFile())
@ -212,19 +234,28 @@ void DiskS3::remove(const String & path, bool recursive)
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
}
file.remove();
}
Poco::File(metadata_path + path).remove(true);
void DiskS3::removeRecursive(const String & path)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
Poco::File file(metadata_path + path);
if (file.isFile())
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
}
else
{
auto it{iterateDirectory(path)};
if (!recursive && it->isValid())
throw Exception("Directory " + path + "is not empty", ErrorCodes::CANNOT_DELETE_DIRECTORY);
for (; it->isValid(); it->next())
remove(it->path(), true);
file.remove(false);
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeRecursive(it->path());
}
file.remove();
}
String DiskS3::getS3Path(const String & path) const
@ -232,18 +263,15 @@ String DiskS3::getS3Path(const String & path) const
if (!exists(path))
throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST);
String s3_path;
Poco::FileInputStream(metadata_path + path) >> s3_path;
return s3_path;
return readKeyFromFile(metadata_path + path);
}
String DiskS3::getRandomName() const
{
std::mt19937 random{std::random_device{}()};
std::uniform_int_distribution<int> distribution('a', 'z');
String suffix(16, ' ');
for (auto & c : suffix)
c = distribution(random);
c = distribution(thread_local_rng);
return suffix;
}
@ -320,6 +348,8 @@ void registerDiskS3(DiskFactory & factory)
auto s3disk = std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path);
/// This code is used only to check access to the corresponding disk.
{
auto file = s3disk->writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
@ -332,8 +362,7 @@ void registerDiskS3(DiskFactory & factory)
throw Exception("No read accecss to S3 bucket in disk " + name, ErrorCodes::PATH_ACCESS_DENIED);
}
{
String s3_path;
Poco::FileInputStream(metadata_path + "test_acl") >> s3_path;
String s3_path = readKeyFromFile(metadata_path + "test_acl");
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(uri.bucket);
request.SetKey(s3_path);

View File

@ -49,12 +49,7 @@ public:
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (isFile(it->path()))
remove(it->path(), false);
}
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
@ -70,7 +65,9 @@ public:
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
virtual void remove(const String & path, bool recursive) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
String getS3Path(const String & path) const;

View File

@ -98,7 +98,7 @@ public:
/// Create directory and all parent directories if necessary.
virtual void createDirectories(const String & path) = 0;
/// Remove all files from the directory.
/// Remove all files from the directory. Directories are not removed.
virtual void clearDirectory(const String & path) = 0;
/// Move directory from `from_path` to `to_path`.
@ -127,8 +127,11 @@ public:
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;
/// Remove file or directory
virtual void remove(const String & path, bool recursive) = 0;
/// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty.
virtual void remove(const String & path) = 0;
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
public:
/// Used for reservation counters modification

View File

@ -93,10 +93,10 @@ namespace S3
if (!endpoint.empty())
cfg.endpointOverride = endpoint;
Aws::Auth::AWSCredentials cred(access_key_id, secret_access_key);
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
return std::make_shared<Aws::S3::S3Client>(
cred, // Aws credentials.
credentials, // Aws credentials.
std::move(cfg), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
endpoint.empty() // Use virtual addressing only if endpoint is not specified.

View File

@ -49,7 +49,7 @@ struct URI
String bucket;
String key;
explicit URI (const Poco::URI & uri_);
explicit URI(const Poco::URI & uri_);
};
}

View File

@ -428,7 +428,7 @@ void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWri
void StorageTinyLog::drop(TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->remove(table_path, true);
disk->removeRecursive(table_path);
files.clear();
}