Reimplement

This commit is contained in:
kssenii 2021-06-18 11:07:41 +00:00
parent cd885ee750
commit 0fa6dc4570
8 changed files with 393 additions and 133 deletions

View File

@ -1,16 +1,24 @@
#include "DiskWebServer.h"
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadIndirectBufferFromWebServer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IDiskRemote.h>
#include <re2/re2.h>
#define DIRECTORY_FILE_PATTERN(prefix) fmt::format("{}-(\\w+)-(\\w+\\.\\w+)", prefix)
#define ROOT_FILE_PATTERN(prefix) fmt::format("{}-(\\w+\\.\\w+)", prefix)
#define MATCH_DIRECTORY_FILE_PATTERN(prefix) fmt::format("{}/(\\w+)/(\\w+\\.\\w+)", prefix)
#define MATCH_ROOT_FILE_PATTERN(prefix) fmt::format("{}/(\\w+\\.\\w+)", prefix)
#define MATCH_DIRECTORY_PATTERN(prefix) fmt::format("{}/(\\w+)", prefix)
namespace DB
@ -22,17 +30,91 @@ namespace ErrorCodes
}
static const auto store_uuid_prefix = ".*/[\\w]{3}/[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}";
/// Fetch contents of .index file from given uri path.
void DiskWebServer::Metadata::initialize(const String & uri_with_path, const String & files_prefix, ContextPtr context) const
{
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_with_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(context));
String directory, file, remote_file_name;
size_t file_size;
while (!metadata_buf.eof())
{
readText(remote_file_name, metadata_buf);
assertChar('\t', metadata_buf);
readIntText(file_size, metadata_buf);
assertChar('\n', metadata_buf);
LOG_DEBUG(&Poco::Logger::get("DiskWeb"), "Read file: {}, size: {}", remote_file_name, file_size);
/*
* URI/ {prefix}-all_x_x_x-{file}
* ...
* {prefix}-format_version.txt
* {prefix}-detached-{file}
* ...
*/
if (RE2::FullMatch(remote_file_name, re2::RE2(DIRECTORY_FILE_PATTERN(files_prefix)), &directory, &file))
{
files[directory].insert({file, file_size});
}
else if (RE2::FullMatch(remote_file_name, re2::RE2(ROOT_FILE_PATTERN(files_prefix)), &file))
{
files[file].insert({file, file_size});
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", remote_file_name);
}
}
/* Iterate list of files from .index file on a web server (its contents were put
* into DiskWebServer::Metadata) and convert them into paths as though paths in local fs.
*/
class DiskWebDirectoryIterator final : public IDiskDirectoryIterator
{
public:
DiskWebDirectoryIterator(DiskWebServer::Metadata & metadata_, const String & directory_root_)
: metadata(metadata_), iter(metadata.files.begin()), directory_root(directory_root_)
{
}
void next() override { ++iter; }
bool isValid() const override { return iter != metadata.files.end(); }
String path() const override
{
return fs::path(directory_root) / name();
}
String name() const override
{
return iter->first;
}
private:
DiskWebServer::Metadata & metadata;
DiskWebServer::FilesDirectory::iterator iter;
const String directory_root;
};
class ReadBufferFromWebServer final : public ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>
{
public:
ReadBufferFromWebServer(
const String & url_,
DiskWebServer::Metadata metadata_,
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_)
: ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>(metadata_)
, url(url_)
, uri(uri_)
, context(context_)
, max_read_tries(max_read_tries_)
, buf_size(buf_size_)
@ -41,55 +123,151 @@ public:
std::unique_ptr<ReadIndirectBufferFromWebServer> createReadBuffer(const String & path) override
{
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(url) / path, context, max_read_tries, buf_size);
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, max_read_tries, buf_size);
}
private:
String url;
String uri;
ContextPtr context;
size_t max_read_tries;
size_t buf_size;
};
class WriteBufferFromNothing : public WriteBufferFromFile
{
public:
WriteBufferFromNothing() : WriteBufferFromFile("/dev/null") {}
void sync() override {}
};
DiskWebServer::DiskWebServer(
const String & disk_name_,
const String & files_root_path_url_,
const String & uri_,
const String & metadata_path_,
ContextPtr context_,
SettingsPtr settings_)
: IDiskRemote(disk_name_, files_root_path_url_, metadata_path_, "DiskWebServer", settings_->thread_pool_size)
, WithContext(context_->getGlobalContext())
: WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("DiskWeb"))
, uri(uri_)
, name(disk_name_)
, metadata_path(metadata_path_)
, settings(std::move(settings_))
{
}
String DiskWebServer::getFileName(const String & path) const
{
String result;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN(store_uuid_prefix))
&& RE2::Extract(path, MATCH_DIRECTORY_FILE_PATTERN(".*"), fmt::format("{}-\\1-\\2", settings->files_prefix), &result))
return result;
if (RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN(store_uuid_prefix))
&& RE2::Extract(path, MATCH_ROOT_FILE_PATTERN(".*"), fmt::format("{}-\\1", settings->files_prefix), &result))
return result;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", path);
}
bool DiskWebServer::findFileInMetadata(const String & path, FileAndSize & file_info) const
{
if (metadata.files.empty())
metadata.initialize(uri, settings->files_prefix, getContext());
String directory_name, file_name;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN(store_uuid_prefix), &directory_name, &file_name))
{
const auto & directory_files = metadata.files.find(directory_name)->second;
auto file = directory_files.find(file_name);
if (file == directory_files.end())
return false;
file_info = std::make_pair(file_name, file->second);
}
else if (RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN(store_uuid_prefix), &file_name))
{
auto file = metadata.files.find(file_name);
if (file == metadata.files.end())
return false;
file_info = std::make_pair(file_name, file->second.find(file_name)->second);
}
else
return false;
return true;
}
bool DiskWebServer::exists(const String & path) const
{
LOG_DEBUG(log, "Checking existance of file: {}", path);
/// Assume root directory exists.
if (re2::RE2::FullMatch(path, re2::RE2(fmt::format("({})/", store_uuid_prefix))))
return true;
FileAndSize file;
return findFileInMetadata(path, file);
}
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
{
auto metadata = readMeta(path);
LOG_DEBUG(log, "Read from file by path: {}", path);
LOG_DEBUG(log, "Read from file by path: {}. Existing objects: {}", backQuote(metadata_path + path), metadata.remote_fs_objects.size());
FileAndSize file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
auto reader = std::make_unique<ReadBufferFromWebServer>(remote_fs_root_path, metadata, getContext(), 1, buf_size);
RemoteMetadata meta(uri, fs::path(path).parent_path() / fs::path(path).filename());
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(path), file.second));
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskWebServer::writeFile(const String & path, size_t buf_size, WriteMode mode)
std::unique_ptr<WriteBufferFromFileBase> DiskWebServer::writeFile(const String &, size_t, WriteMode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
return std::make_unique<WriteBufferFromNothing>();
}
auto file_name = generateName();
String file_path = fs::path(remote_fs_root_path) / file_name;
LOG_DEBUG(log, "Write to file url: {}", file_path);
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
{
LOG_DEBUG(log, "Iterate directory: {}", path);
return std::make_unique<DiskWebDirectoryIterator>(metadata, path);
}
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(getContext());
Poco::URI uri(file_path);
auto writer = std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_PUT, timeouts, buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHTTP>>(std::move(writer), std::move(metadata), file_name);
size_t DiskWebServer::getFileSize(const String & path) const
{
FileAndSize file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
return file.second;
}
bool DiskWebServer::isFile(const String & path) const
{
return RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN(".*")) || RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN(".*"));
}
bool DiskWebServer::isDirectory(const String & path) const
{
return RE2::FullMatch(path, MATCH_DIRECTORY_PATTERN(".*"));
}
@ -103,18 +281,18 @@ void registerDiskWebServer(DiskFactory & factory)
fs::path disk = fs::path(context->getPath()) / "disks" / disk_name;
fs::create_directories(disk);
String url{config.getString(config_prefix + ".endpoint")};
if (!url.ends_with('/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "URL must end with '/', but '{}' doesn't.", url);
String uri{config.getString(config_prefix + ".endpoint")};
if (!uri.ends_with('/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
auto settings = std::make_unique<DiskWebServerSettings>(
context->getGlobalContext()->getSettingsRef().http_max_single_read_retries,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".thread_pool_size", 16));
config.getString(config_prefix + ".files_prefix", disk_name));
String metadata_path = fs::path(context->getPath()) / "disks" / disk_name / "";
return std::make_shared<DiskWebServer>(disk_name, url, metadata_path, context, std::move(settings));
return std::make_shared<DiskWebServer>(disk_name, uri, metadata_path, context, std::move(settings));
};
factory.registerDiskType("web", creator);

View File

@ -1,7 +1,9 @@
#pragma once
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <Core/UUID.h>
#include <set>
namespace DB
@ -13,43 +15,137 @@ struct DiskWebServerSettings
size_t max_read_tries;
/// Passed to SeekAvoidingReadBuffer.
size_t min_bytes_for_seek;
/// Used by IDiskRemote.
size_t thread_pool_size;
String files_prefix;
DiskWebServerSettings(size_t max_read_tries_, size_t min_bytes_for_seek_, size_t thread_pool_size_)
: max_read_tries(max_read_tries_) , min_bytes_for_seek(min_bytes_for_seek_) , thread_pool_size(thread_pool_size_) {}
DiskWebServerSettings(size_t max_read_tries_, size_t min_bytes_for_seek_, String files_prefix_)
: max_read_tries(max_read_tries_) , min_bytes_for_seek(min_bytes_for_seek_), files_prefix(files_prefix_) {}
};
/// Storage to store data on a web server and metadata on the local disk.
class DiskWebServer : public IDiskRemote, WithContext
class DiskWebServer : public IDisk, WithContext
{
using SettingsPtr = std::unique_ptr<DiskWebServerSettings>;
public:
DiskWebServer(const String & disk_name_,
const String & files_root_path_url_,
const String & metadata_path_,
ContextPtr context,
SettingsPtr settings_);
const String & files_root_path_uri_,
const String & metadata_path_,
ContextPtr context,
SettingsPtr settings_);
using FileAndSize = std::pair<String, size_t>;
using FilesInfo = std::unordered_map<String, size_t>;
using FilesDirectory = std::map<String, FilesInfo>;
struct Metadata
{
/// Fetch meta only when required.
mutable FilesDirectory files;
Metadata() {}
void initialize(const String & uri_with_path, const String & files_prefix, ContextPtr context) const;
};
bool findFileInMetadata(const String & path, FileAndSize & file_info) const;
String getFileName(const String & path) const;
DiskType::Type getType() const override { return DiskType::Type::WebServer; }
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
/// Disk info
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
const String & getName() const final override { return name; }
/// ???
const String & getPath() const final override { return metadata_path; }
UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); }
/// Read-only part
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
size_t getFileSize(const String & path) const override;
void listFiles(const String & /* path */, std::vector<String> & /* file_names */) override { }
void setReadOnly(const String & /* path */) override {}
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }
ReservationPtr reserve(UInt64 /*bytes*/) override { return nullptr; }
/// Write and modification part
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String &, size_t, WriteMode) override;
void moveFile(const String &, const String &) override {}
void replaceFile(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFile(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFileIfExists(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeRecursive(const String &) override {}
void removeSharedFile(const String &, bool) override {}
void removeSharedRecursive(const String &, bool) override {}
void clearDirectory(const String &) override {}
void moveDirectory(const String &, const String &) override {}
void removeDirectory(const String &) override {}
void setLastModified(const String &, const Poco::Timestamp &) override {}
/// Create part
void createFile(const String &) final override {}
void createDirectory(const String &) override {}
void createDirectories(const String &) override {}
void createHardLink(const String &, const String &) override {}
private:
String generateName() { return toString(UUIDHelpers::generateV4()); }
Poco::Logger * log;
String uri, name;
const String metadata_path;
SettingsPtr settings;
Metadata metadata;
};
}

View File

@ -33,10 +33,9 @@ IDiskRemote::Metadata::Metadata(
const String & disk_path_,
const String & metadata_file_path_,
bool create)
: remote_fs_root_path(remote_fs_root_path_)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, disk_path(disk_path_)
, metadata_file_path(metadata_file_path_)
, total_size(0), remote_fs_objects(0), ref_count(0)
, total_size(0), ref_count(0)
{
if (create)
return;
@ -416,7 +415,7 @@ void IDiskRemote::removeDirectory(const String & path)
DiskDirectoryIteratorPtr IDiskRemote::iterateDirectory(const String & path)
{
return std::make_unique<RemoteDiskDirectoryIterator>(metadata_path + path, path);
return std::make_unique<RemoteDiskDirectoryIterator>(fs::path(metadata_path) / path, path);
}

View File

@ -17,14 +17,8 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Helper class to collect paths into chunks of maximum size.
/// For diskS3 it is Aws::vector<ObjectIdentifier>, for diskHDFS it is std::vector<std::string>.
/// For diskWEBServer not implemented.
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
class RemoteFSPathKeeper
{
public:
@ -41,10 +35,10 @@ protected:
using RemoteFSPathKeeperPtr = std::shared_ptr<RemoteFSPathKeeper>;
/// Base Disk class for remote FS's, which are not posix-compatible.
/// Used to implement disks over s3, hdfs, web-server.
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
class IDiskRemote : public IDisk
{
friend class DiskRemoteReservation;
public:
@ -55,50 +49,34 @@ public:
const String & log_name_,
size_t thread_pool_size);
/// Methods to manage local metadata of remote FS objects.
struct Metadata;
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_path; }
Metadata readMeta(const String & path) const;
Metadata createMeta(const String & path) const;
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
/// Disk info
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
const String & getName() const final override { return name; }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
const String & getPath() const final override { return metadata_path; }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); }
/// Read-only part
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
void createFile(const String & path) override;
size_t getFileSize(const String & path) const override;
void listFiles(const String & path, std::vector<String> & file_names) override;
void setReadOnly(const String & path) override;
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
Poco::Timestamp getLastModified(const String & path) override;
ReservationPtr reserve(UInt64 bytes) override;
/// Write and modification part
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
@ -113,42 +91,39 @@ public:
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
void setReadOnly(const String & path) override;
bool isDirectory(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
void removeDirectory(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
/// Overridden by disks s3 and hdfs.
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr /* fs_paths_keeper */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} does not support removing remote files", getName());
}
/// Overridden by disks s3 and hdfs.
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} does not support FS paths keeper", getName());
}
/// Create part
void createFile(const String & path) final override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
Poco::Timestamp getLastModified(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0;
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
protected:
Poco::Logger * log;
/// Disk name
const String name;
/// URL + root path to store files in remote FS.
const String remote_fs_root_path;
const String metadata_path;
@ -168,11 +143,29 @@ private:
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Remote FS (S3, HDFS, WEB-server) metadata file layout:
/// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
struct RemoteMetadata
{
using PathAndSize = std::pair<String, size_t>;
/// Remote FS objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;
/// Relative path to metadata file on local FS.
const String & metadata_file_path;
RemoteMetadata(const String & remote_fs_root_path_, const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_), metadata_file_path(metadata_file_path_) {}
};
/// Remote FS (S3, HDFS) metadata file layout:
/// FS objects, their number and total size of all FS objects.
/// Each FS object represents a file path in remote FS and its size.
struct IDiskRemote::Metadata
struct IDiskRemote::Metadata : RemoteMetadata
{
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
@ -181,21 +174,12 @@ struct IDiskRemote::Metadata
using PathAndSize = std::pair<String, size_t>;
/// Remote FS (S3, HDFS, WEB-server) root path (uri + files directory path).
const String & remote_fs_root_path;
/// Disk path.
const String & disk_path;
/// Relative path to metadata file on local FS.
String metadata_file_path;
/// Total size of all remote FS objects.
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
/// Remote FS objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// Number of references (hardlinks) to this metadata file.
UInt32 ref_count = 0;

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
template<typename T>
ReadIndirectBufferFromRemoteFS<T>::ReadIndirectBufferFromRemoteFS(
IDiskRemote::Metadata metadata_)
RemoteMetadata metadata_)
: metadata(std::move(metadata_))
{
}

View File

@ -17,7 +17,7 @@ template <typename T>
class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_);
ReadIndirectBufferFromRemoteFS(RemoteMetadata metadata_);
off_t seek(off_t offset_, int whence) override;
@ -28,7 +28,7 @@ public:
virtual std::unique_ptr<T> createReadBuffer(const String & path) = 0;
protected:
IDiskRemote::Metadata metadata;
RemoteMetadata metadata;
private:
std::unique_ptr<T> initialize();

View File

@ -134,7 +134,7 @@ public:
std::unique_ptr<ReadBufferFromS3> createReadBuffer(const String & path) override
{
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.remote_fs_root_path + path, max_single_read_retries, buf_size);
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, buf_size);
}
private:

View File

@ -13,6 +13,9 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
@ -67,7 +70,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
, backup_path(backup_path_)
, backup_tmp_path(backup_tmp_path_)
, backup_file_name(backup_file_name_)
, backup_buf(table_.disk->writeFile(backup_tmp_path + backup_file_name))
, backup_buf(table_.disk->writeFile(fs::path(backup_tmp_path) / backup_file_name))
, compressed_backup_buf(*backup_buf)
, backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
, persistent(persistent_)
@ -94,7 +97,7 @@ void SetOrJoinBlockOutputStream::writeSuffix()
backup_buf->next();
backup_buf->finalize();
table.disk->replaceFile(backup_tmp_path + backup_file_name, backup_path + backup_file_name);
table.disk->replaceFile(fs::path(backup_tmp_path) / backup_file_name, fs::path(backup_path) / backup_file_name);
}
}
@ -102,7 +105,7 @@ void SetOrJoinBlockOutputStream::writeSuffix()
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
}
@ -161,7 +164,7 @@ void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
{
disk->removeRecursive(path);
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
disk->createDirectories(fs::path(path) / "tmp/");
Block header = metadata_snapshot->getSampleBlock();
header = header.sortColumns();
@ -174,9 +177,9 @@ void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
void StorageSetOrJoinBase::restore()
{
if (!disk->exists(path + "tmp/"))
if (!disk->exists(fs::path(path) / "tmp/"))
{
disk->createDirectories(path + "tmp/");
disk->createDirectories(fs::path(path) / "tmp/");
return;
}