mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Get rid of IDiskRemote
This commit is contained in:
parent
c87c3fcfd9
commit
92c15ec97c
@ -4,7 +4,6 @@
|
|||||||
|
|
||||||
#if USE_AZURE_BLOB_STORAGE
|
#if USE_AZURE_BLOB_STORAGE
|
||||||
|
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <azure/storage/blobs.hpp>
|
#include <azure/storage/blobs.hpp>
|
||||||
#include <Disks/AzureObjectStorage.h>
|
#include <Disks/AzureObjectStorage.h>
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||||
#include <Common/FileCache.h>
|
#include <Common/FileCache.h>
|
||||||
|
|
||||||
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
@ -9,8 +9,13 @@
|
|||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
|
|
||||||
#include <Disks/IDiskRemote.h>
|
#include <Disks/IDisk.h>
|
||||||
|
#include <Disks/IObjectStorage.h>
|
||||||
|
#include <IO/ReadBufferFromFile.h>
|
||||||
|
|
||||||
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
|
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
|
||||||
|
|
||||||
|
|
||||||
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
|
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
|
||||||
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
|
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
|
||||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||||
@ -173,7 +178,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
|||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
auto reader = IDiskRemote::getThreadPoolReader();
|
auto reader = IObjectStorage::getThreadPoolReader();
|
||||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
|
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
#include <Core/UUID.h>
|
#include <Core/UUID.h>
|
||||||
#include <set>
|
#include <set>
|
||||||
|
|
||||||
|
#include <Interpreters/Context_fwd.h>
|
||||||
|
#include <Disks/IDisk.h>
|
||||||
|
#include <IO/ReadBufferFromFile.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
@ -1,702 +0,0 @@
|
|||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
|
|
||||||
#include "Disks/DiskFactory.h"
|
|
||||||
#include <IO/ReadBufferFromFile.h>
|
|
||||||
#include <IO/ReadHelpers.h>
|
|
||||||
#include <IO/WriteBufferFromFile.h>
|
|
||||||
#include <IO/WriteHelpers.h>
|
|
||||||
#include <Common/createHardLink.h>
|
|
||||||
#include <Common/quoteString.h>
|
|
||||||
#include <Common/logger_useful.h>
|
|
||||||
#include <Common/checkStackSize.h>
|
|
||||||
#include <boost/algorithm/string.hpp>
|
|
||||||
#include <Common/filesystemHelpers.h>
|
|
||||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
|
||||||
#include <Common/FileCache.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int INCORRECT_DISK_INDEX;
|
|
||||||
extern const int UNKNOWN_FORMAT;
|
|
||||||
extern const int FILE_ALREADY_EXISTS;
|
|
||||||
extern const int PATH_ACCESS_DENIED;;
|
|
||||||
extern const int FILE_DOESNT_EXIST;
|
|
||||||
extern const int BAD_FILE_TYPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
|
|
||||||
{
|
|
||||||
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
result.load();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
|
|
||||||
{
|
|
||||||
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
result.save(sync);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
result.load();
|
|
||||||
if (updater(result))
|
|
||||||
result.save(sync);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
updater(result);
|
|
||||||
result.save(sync);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
result.load();
|
|
||||||
if (updater(result))
|
|
||||||
result.save(sync);
|
|
||||||
metadata_disk_->removeFile(metadata_file_path_);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
|
|
||||||
{
|
|
||||||
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
|
|
||||||
{
|
|
||||||
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
|
|
||||||
if (result.read_only)
|
|
||||||
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::Metadata::load()
|
|
||||||
{
|
|
||||||
const ReadSettings read_settings;
|
|
||||||
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
|
|
||||||
|
|
||||||
UInt32 version;
|
|
||||||
readIntText(version, *buf);
|
|
||||||
|
|
||||||
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
|
|
||||||
throw Exception(
|
|
||||||
ErrorCodes::UNKNOWN_FORMAT,
|
|
||||||
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
|
|
||||||
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
|
|
||||||
|
|
||||||
assertChar('\n', *buf);
|
|
||||||
|
|
||||||
UInt32 remote_fs_objects_count;
|
|
||||||
readIntText(remote_fs_objects_count, *buf);
|
|
||||||
assertChar('\t', *buf);
|
|
||||||
readIntText(total_size, *buf);
|
|
||||||
assertChar('\n', *buf);
|
|
||||||
remote_fs_objects.resize(remote_fs_objects_count);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < remote_fs_objects_count; ++i)
|
|
||||||
{
|
|
||||||
String remote_fs_object_path;
|
|
||||||
size_t remote_fs_object_size;
|
|
||||||
readIntText(remote_fs_object_size, *buf);
|
|
||||||
assertChar('\t', *buf);
|
|
||||||
readEscapedString(remote_fs_object_path, *buf);
|
|
||||||
if (version == VERSION_ABSOLUTE_PATHS)
|
|
||||||
{
|
|
||||||
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
|
|
||||||
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
|
|
||||||
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
|
|
||||||
|
|
||||||
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
|
||||||
}
|
|
||||||
assertChar('\n', *buf);
|
|
||||||
remote_fs_objects[i].relative_path = remote_fs_object_path;
|
|
||||||
remote_fs_objects[i].bytes_size = remote_fs_object_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
readIntText(ref_count, *buf);
|
|
||||||
assertChar('\n', *buf);
|
|
||||||
|
|
||||||
if (version >= VERSION_READ_ONLY_FLAG)
|
|
||||||
{
|
|
||||||
readBoolText(read_only, *buf);
|
|
||||||
assertChar('\n', *buf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Load metadata by path or create empty if `create` flag is set.
|
|
||||||
IDiskRemote::Metadata::Metadata(
|
|
||||||
const String & remote_fs_root_path_,
|
|
||||||
DiskPtr metadata_disk_,
|
|
||||||
const String & metadata_file_path_)
|
|
||||||
: remote_fs_root_path(remote_fs_root_path_)
|
|
||||||
, metadata_file_path(metadata_file_path_)
|
|
||||||
, metadata_disk(metadata_disk_)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::Metadata::addObject(const String & path, size_t size)
|
|
||||||
{
|
|
||||||
total_size += size;
|
|
||||||
remote_fs_objects.emplace_back(path, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::Metadata::saveToBuffer(WriteBuffer & buf, bool sync)
|
|
||||||
{
|
|
||||||
writeIntText(VERSION_RELATIVE_PATHS, buf);
|
|
||||||
writeChar('\n', buf);
|
|
||||||
|
|
||||||
writeIntText(remote_fs_objects.size(), buf);
|
|
||||||
writeChar('\t', buf);
|
|
||||||
writeIntText(total_size, buf);
|
|
||||||
writeChar('\n', buf);
|
|
||||||
|
|
||||||
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
|
|
||||||
{
|
|
||||||
writeIntText(remote_fs_object_size, buf);
|
|
||||||
writeChar('\t', buf);
|
|
||||||
writeEscapedString(remote_fs_object_path, buf);
|
|
||||||
writeChar('\n', buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
writeIntText(ref_count, buf);
|
|
||||||
writeChar('\n', buf);
|
|
||||||
|
|
||||||
writeBoolText(read_only, buf);
|
|
||||||
writeChar('\n', buf);
|
|
||||||
|
|
||||||
buf.finalize();
|
|
||||||
if (sync)
|
|
||||||
buf.sync();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fsync metadata file if 'sync' flag is set.
|
|
||||||
void IDiskRemote::Metadata::save(bool sync)
|
|
||||||
{
|
|
||||||
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
|
|
||||||
saveToBuffer(*buf, sync);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string IDiskRemote::Metadata::serializeToString()
|
|
||||||
{
|
|
||||||
WriteBufferFromOwnString write_buf;
|
|
||||||
saveToBuffer(write_buf, false);
|
|
||||||
return write_buf.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const
|
|
||||||
{
|
|
||||||
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::readMetadata(const String & path) const
|
|
||||||
{
|
|
||||||
std::shared_lock lock(metadata_mutex);
|
|
||||||
return readMetadataUnlocked(path, lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::readUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
std::unique_lock lock(metadata_mutex);
|
|
||||||
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::readUpdateStoreMetadataAndRemove(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
std::unique_lock lock(metadata_mutex);
|
|
||||||
return Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
|
|
||||||
{
|
|
||||||
std::unique_lock lock(metadata_mutex);
|
|
||||||
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::createAndStoreMetadata(const String & path, bool sync)
|
|
||||||
{
|
|
||||||
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
|
|
||||||
}
|
|
||||||
|
|
||||||
IDiskRemote::Metadata IDiskRemote::createUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
|
|
||||||
{
|
|
||||||
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
std::unordered_map<String, String> IDiskRemote::getSerializedMetadata(const std::vector<std::string> & file_paths) const
|
|
||||||
{
|
|
||||||
std::unordered_map<String, String> metadatas;
|
|
||||||
|
|
||||||
std::shared_lock lock(metadata_mutex);
|
|
||||||
|
|
||||||
for (const auto & path : file_paths)
|
|
||||||
{
|
|
||||||
IDiskRemote::Metadata metadata = readMetadataUnlocked(path, lock);
|
|
||||||
metadata.ref_count = 0;
|
|
||||||
metadatas[path] = metadata.serializeToString();
|
|
||||||
}
|
|
||||||
|
|
||||||
return metadatas;
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
|
|
||||||
{
|
|
||||||
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
|
|
||||||
|
|
||||||
if (!metadata_disk->exists(path))
|
|
||||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
|
|
||||||
|
|
||||||
if (!metadata_disk->isFile(path))
|
|
||||||
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata)
|
|
||||||
{
|
|
||||||
if (metadata.ref_count == 0)
|
|
||||||
{
|
|
||||||
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
|
|
||||||
{
|
|
||||||
|
|
||||||
paths_to_remove.push_back(remote_fs_root_path + remote_fs_object_path);
|
|
||||||
|
|
||||||
if (cache)
|
|
||||||
{
|
|
||||||
auto key = cache->hash(remote_fs_object_path);
|
|
||||||
cache->remove(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
else /// In other case decrement number of references, save metadata and delete hardlink.
|
|
||||||
{
|
|
||||||
--metadata.ref_count;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
|
|
||||||
readUpdateStoreMetadataAndRemove(path, false, metadata_updater);
|
|
||||||
/// If there is no references - delete content from remote FS.
|
|
||||||
}
|
|
||||||
catch (const Exception & e)
|
|
||||||
{
|
|
||||||
/// If it's impossible to read meta - just remove it from FS.
|
|
||||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
|
||||||
{
|
|
||||||
LOG_WARNING(log,
|
|
||||||
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
|
|
||||||
backQuote(path), e.nested() ? e.nested()->message() : e.message());
|
|
||||||
metadata_disk->removeFile(path);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove)
|
|
||||||
{
|
|
||||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
|
||||||
|
|
||||||
if (metadata_disk->isFile(path))
|
|
||||||
{
|
|
||||||
removeMetadata(path, paths_to_remove[path]);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (auto it = iterateDirectory(path); it->isValid(); it->next())
|
|
||||||
removeMetadataRecursive(it->path(), paths_to_remove);
|
|
||||||
|
|
||||||
metadata_disk->removeDirectory(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<String> IDiskRemote::getRemotePaths(const String & local_path) const
|
|
||||||
{
|
|
||||||
auto metadata = readMetadata(local_path);
|
|
||||||
|
|
||||||
std::vector<String> remote_paths;
|
|
||||||
for (const auto & [remote_path, _] : metadata.remote_fs_objects)
|
|
||||||
remote_paths.push_back(fs::path(metadata.remote_fs_root_path) / remote_path);
|
|
||||||
|
|
||||||
return remote_paths;
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
|
||||||
{
|
|
||||||
/// Protect against concurrent delition of files (for example because of a merge).
|
|
||||||
if (metadata_disk->isFile(local_path))
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
paths_map.emplace_back(local_path, getRemotePaths(local_path));
|
|
||||||
}
|
|
||||||
catch (const Exception & e)
|
|
||||||
{
|
|
||||||
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
|
|
||||||
return;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
DiskDirectoryIteratorPtr it;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
it = iterateDirectory(local_path);
|
|
||||||
}
|
|
||||||
catch (const fs::filesystem_error & e)
|
|
||||||
{
|
|
||||||
if (e.code() == std::errc::no_such_file_or_directory)
|
|
||||||
return;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (; it->isValid(); it->next())
|
|
||||||
IDiskRemote::getRemotePathsRecursive(fs::path(local_path) / it->name(), paths_map);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
DiskPtr DiskRemoteReservation::getDisk(size_t i) const
|
|
||||||
{
|
|
||||||
if (i != 0)
|
|
||||||
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
|
|
||||||
return disk;
|
|
||||||
}
|
|
||||||
|
|
||||||
void DiskRemoteReservation::update(UInt64 new_size)
|
|
||||||
{
|
|
||||||
std::lock_guard lock(disk->reservation_mutex);
|
|
||||||
disk->reserved_bytes -= size;
|
|
||||||
size = new_size;
|
|
||||||
disk->reserved_bytes += size;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DiskRemoteReservation::~DiskRemoteReservation()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
std::lock_guard lock(disk->reservation_mutex);
|
|
||||||
if (disk->reserved_bytes < size)
|
|
||||||
{
|
|
||||||
disk->reserved_bytes = 0;
|
|
||||||
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
disk->reserved_bytes -= size;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (disk->reservation_count == 0)
|
|
||||||
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
|
|
||||||
else
|
|
||||||
--disk->reservation_count;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
IDiskRemote::IDiskRemote(
|
|
||||||
const String & name_,
|
|
||||||
const String & remote_fs_root_path_,
|
|
||||||
DiskPtr metadata_disk_,
|
|
||||||
FileCachePtr cache_,
|
|
||||||
const String & log_name_,
|
|
||||||
size_t thread_pool_size)
|
|
||||||
: IDisk(std::make_unique<AsyncExecutor>(log_name_, thread_pool_size))
|
|
||||||
, log(&Poco::Logger::get(log_name_))
|
|
||||||
, name(name_)
|
|
||||||
, remote_fs_root_path(remote_fs_root_path_)
|
|
||||||
, metadata_disk(metadata_disk_)
|
|
||||||
, cache(cache_)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
String IDiskRemote::getCacheBasePath() const
|
|
||||||
{
|
|
||||||
return cache ? cache->getBasePath() : "";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool IDiskRemote::exists(const String & path) const
|
|
||||||
{
|
|
||||||
return metadata_disk->exists(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool IDiskRemote::isFile(const String & path) const
|
|
||||||
{
|
|
||||||
return metadata_disk->isFile(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::createFile(const String & path)
|
|
||||||
{
|
|
||||||
createAndStoreMetadata(path, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
size_t IDiskRemote::getFileSize(const String & path) const
|
|
||||||
{
|
|
||||||
return readMetadata(path).total_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::moveFile(const String & from_path, const String & to_path)
|
|
||||||
{
|
|
||||||
if (exists(to_path))
|
|
||||||
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
|
|
||||||
|
|
||||||
metadata_disk->moveFile(from_path, to_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
|
|
||||||
{
|
|
||||||
if (exists(to_path))
|
|
||||||
{
|
|
||||||
const String tmp_path = to_path + ".old";
|
|
||||||
moveFile(to_path, tmp_path);
|
|
||||||
moveFile(from_path, to_path);
|
|
||||||
removeFile(tmp_path);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
moveFile(from_path, to_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
|
|
||||||
{
|
|
||||||
std::vector<String> paths_to_remove;
|
|
||||||
removeMetadata(path, paths_to_remove);
|
|
||||||
|
|
||||||
if (!delete_metadata_only)
|
|
||||||
removeFromRemoteFS(paths_to_remove);
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
|
|
||||||
{
|
|
||||||
std::vector<String> paths_to_remove;
|
|
||||||
if (metadata_disk->exists(path))
|
|
||||||
{
|
|
||||||
removeMetadata(path, paths_to_remove);
|
|
||||||
if (!delete_metadata_only)
|
|
||||||
removeFromRemoteFS(paths_to_remove);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
|
|
||||||
{
|
|
||||||
std::unordered_map<String, std::vector<String>> paths_to_remove;
|
|
||||||
for (const auto & file : files)
|
|
||||||
{
|
|
||||||
bool skip = file.if_exists && !metadata_disk->exists(file.path);
|
|
||||||
if (!skip)
|
|
||||||
removeMetadata(file.path, paths_to_remove[file.path]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!keep_all_batch_data)
|
|
||||||
{
|
|
||||||
std::vector<String> remove_from_remote;
|
|
||||||
for (auto && [path, remote_paths] : paths_to_remove)
|
|
||||||
{
|
|
||||||
if (!file_names_remove_metadata_only.contains(fs::path(path).filename()))
|
|
||||||
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
|
|
||||||
}
|
|
||||||
removeFromRemoteFS(remove_from_remote);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
|
|
||||||
{
|
|
||||||
std::unordered_map<String, std::vector<String>> paths_to_remove;
|
|
||||||
removeMetadataRecursive(path, paths_to_remove);
|
|
||||||
|
|
||||||
if (!keep_all_batch_data)
|
|
||||||
{
|
|
||||||
std::vector<String> remove_from_remote;
|
|
||||||
for (auto && [local_path, remote_paths] : paths_to_remove)
|
|
||||||
{
|
|
||||||
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
|
|
||||||
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
|
|
||||||
}
|
|
||||||
removeFromRemoteFS(remove_from_remote);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::setReadOnly(const String & path)
|
|
||||||
{
|
|
||||||
/// We should store read only flag inside metadata file (instead of using FS flag),
|
|
||||||
/// because we modify metadata file when create hard-links from it.
|
|
||||||
readUpdateAndStoreMetadata(path, false, [] (Metadata & metadata) { metadata.read_only = true; return true; });
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool IDiskRemote::isDirectory(const String & path) const
|
|
||||||
{
|
|
||||||
return metadata_disk->isDirectory(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::createDirectory(const String & path)
|
|
||||||
{
|
|
||||||
metadata_disk->createDirectory(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::createDirectories(const String & path)
|
|
||||||
{
|
|
||||||
metadata_disk->createDirectories(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::clearDirectory(const String & path)
|
|
||||||
{
|
|
||||||
for (auto it = iterateDirectory(path); it->isValid(); it->next())
|
|
||||||
if (isFile(it->path()))
|
|
||||||
removeFile(it->path());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::removeDirectory(const String & path)
|
|
||||||
{
|
|
||||||
metadata_disk->removeDirectory(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DiskDirectoryIteratorPtr IDiskRemote::iterateDirectory(const String & path)
|
|
||||||
{
|
|
||||||
return metadata_disk->iterateDirectory(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::listFiles(const String & path, std::vector<String> & file_names)
|
|
||||||
{
|
|
||||||
for (auto it = iterateDirectory(path); it->isValid(); it->next())
|
|
||||||
file_names.push_back(it->name());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::setLastModified(const String & path, const Poco::Timestamp & timestamp)
|
|
||||||
{
|
|
||||||
metadata_disk->setLastModified(path, timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Poco::Timestamp IDiskRemote::getLastModified(const String & path)
|
|
||||||
{
|
|
||||||
return metadata_disk->getLastModified(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void IDiskRemote::createHardLink(const String & src_path, const String & dst_path)
|
|
||||||
{
|
|
||||||
readUpdateAndStoreMetadata(src_path, false, [] (Metadata & metadata) { metadata.ref_count++; return true; });
|
|
||||||
|
|
||||||
/// Create FS hardlink to metadata file.
|
|
||||||
metadata_disk->createHardLink(src_path, dst_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
ReservationPtr IDiskRemote::reserve(UInt64 bytes)
|
|
||||||
{
|
|
||||||
if (!tryReserve(bytes))
|
|
||||||
return {};
|
|
||||||
|
|
||||||
return std::make_unique<DiskRemoteReservation>(std::static_pointer_cast<IDiskRemote>(shared_from_this()), bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool IDiskRemote::tryReserve(UInt64 bytes)
|
|
||||||
{
|
|
||||||
std::lock_guard lock(reservation_mutex);
|
|
||||||
if (bytes == 0)
|
|
||||||
{
|
|
||||||
LOG_TRACE(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name));
|
|
||||||
++reservation_count;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto available_space = getAvailableSpace();
|
|
||||||
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
|
|
||||||
if (unreserved_space >= bytes)
|
|
||||||
{
|
|
||||||
LOG_TRACE(log, "Reserving {} on disk {}, having unreserved {}.",
|
|
||||||
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
|
||||||
++reservation_count;
|
|
||||||
reserved_bytes += bytes;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
String IDiskRemote::getUniqueId(const String & path) const
|
|
||||||
{
|
|
||||||
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
|
|
||||||
auto metadata = readMetadata(path);
|
|
||||||
String id;
|
|
||||||
if (!metadata.remote_fs_objects.empty())
|
|
||||||
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path;
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
|
|
||||||
{
|
|
||||||
constexpr size_t pool_size = 50;
|
|
||||||
constexpr size_t queue_size = 1000000;
|
|
||||||
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(pool_size, queue_size);
|
|
||||||
return reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
UInt32 IDiskRemote::getRefCount(const String & path) const
|
|
||||||
{
|
|
||||||
return readMetadata(path).ref_count;
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadPool & IDiskRemote::getThreadPoolWriter()
|
|
||||||
{
|
|
||||||
constexpr size_t pool_size = 100;
|
|
||||||
constexpr size_t queue_size = 1000000;
|
|
||||||
static ThreadPool writer(pool_size, pool_size, queue_size);
|
|
||||||
return writer;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,302 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <Common/config.h>
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <Common/FileCache_fwd.h>
|
|
||||||
#include <Disks/DiskFactory.h>
|
|
||||||
#include <Disks/Executor.h>
|
|
||||||
#include <utility>
|
|
||||||
#include <mutex>
|
|
||||||
#include <shared_mutex>
|
|
||||||
#include <Common/MultiVersion.h>
|
|
||||||
#include <Common/ThreadPool.h>
|
|
||||||
#include <filesystem>
|
|
||||||
|
|
||||||
namespace CurrentMetrics
|
|
||||||
{
|
|
||||||
extern const Metric DiskSpaceReservedForMerge;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
class IAsynchronousReader;
|
|
||||||
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
|
|
||||||
|
|
||||||
|
|
||||||
/// Base Disk class for remote FS's, which are not posix-compatible (e.g. DiskS3, DiskHDFS, DiskBlobStorage)
|
|
||||||
class IDiskRemote : public IDisk
|
|
||||||
{
|
|
||||||
|
|
||||||
friend class DiskRemoteReservation;
|
|
||||||
|
|
||||||
public:
|
|
||||||
IDiskRemote(
|
|
||||||
const String & name_,
|
|
||||||
const String & remote_fs_root_path_,
|
|
||||||
DiskPtr metadata_disk_,
|
|
||||||
FileCachePtr cache_,
|
|
||||||
const String & log_name_,
|
|
||||||
size_t thread_pool_size);
|
|
||||||
|
|
||||||
struct Metadata;
|
|
||||||
using MetadataUpdater = std::function<bool(Metadata & metadata)>;
|
|
||||||
|
|
||||||
const String & getName() const final override { return name; }
|
|
||||||
|
|
||||||
const String & getPath() const final override { return metadata_disk->getPath(); }
|
|
||||||
|
|
||||||
String getCacheBasePath() const final override;
|
|
||||||
|
|
||||||
std::vector<String> getRemotePaths(const String & local_path) const final override;
|
|
||||||
|
|
||||||
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
|
||||||
|
|
||||||
/// Methods for working with metadata. For some operations (like hardlink
|
|
||||||
/// creation) metadata can be updated concurrently from multiple threads
|
|
||||||
/// (file actually rewritten on disk). So additional RW lock is required for
|
|
||||||
/// metadata read and write, but not for create new metadata.
|
|
||||||
Metadata readMetadata(const String & path) const;
|
|
||||||
Metadata readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const;
|
|
||||||
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
|
|
||||||
Metadata readUpdateStoreMetadataAndRemove(const String & path, bool sync, MetadataUpdater updater);
|
|
||||||
|
|
||||||
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);
|
|
||||||
|
|
||||||
Metadata createAndStoreMetadata(const String & path, bool sync);
|
|
||||||
Metadata createUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
|
|
||||||
|
|
||||||
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
|
|
||||||
|
|
||||||
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
|
|
||||||
|
|
||||||
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
|
|
||||||
|
|
||||||
UInt64 getKeepingFreeSpace() const override { return 0; }
|
|
||||||
|
|
||||||
bool exists(const String & path) const override;
|
|
||||||
|
|
||||||
bool isFile(const String & path) const override;
|
|
||||||
|
|
||||||
void createFile(const String & path) override;
|
|
||||||
|
|
||||||
size_t getFileSize(const String & path) const override;
|
|
||||||
|
|
||||||
void moveFile(const String & from_path, const String & to_path) override;
|
|
||||||
|
|
||||||
void replaceFile(const String & from_path, const String & to_path) override;
|
|
||||||
|
|
||||||
void removeFile(const String & path) override { removeSharedFile(path, false); }
|
|
||||||
|
|
||||||
void removeFileIfExists(const String & path) override { removeSharedFileIfExists(path, false); }
|
|
||||||
|
|
||||||
void removeRecursive(const String & path) override { removeSharedRecursive(path, false, {}); }
|
|
||||||
|
|
||||||
|
|
||||||
void removeSharedFile(const String & path, bool delete_metadata_only) override;
|
|
||||||
|
|
||||||
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
|
|
||||||
|
|
||||||
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
|
|
||||||
|
|
||||||
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
|
|
||||||
|
|
||||||
void listFiles(const String & path, std::vector<String> & file_names) override;
|
|
||||||
|
|
||||||
void setReadOnly(const String & path) override;
|
|
||||||
|
|
||||||
bool isDirectory(const String & path) const override;
|
|
||||||
|
|
||||||
void createDirectory(const String & path) override;
|
|
||||||
|
|
||||||
void createDirectories(const String & path) override;
|
|
||||||
|
|
||||||
void clearDirectory(const String & path) override;
|
|
||||||
|
|
||||||
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
|
|
||||||
|
|
||||||
void removeDirectory(const String & path) override;
|
|
||||||
|
|
||||||
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
|
|
||||||
|
|
||||||
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
|
|
||||||
|
|
||||||
Poco::Timestamp getLastModified(const String & path) override;
|
|
||||||
|
|
||||||
void createHardLink(const String & src_path, const String & dst_path) override;
|
|
||||||
|
|
||||||
ReservationPtr reserve(UInt64 bytes) override;
|
|
||||||
|
|
||||||
String getUniqueId(const String & path) const override;
|
|
||||||
|
|
||||||
bool checkUniqueId(const String & id) const override = 0;
|
|
||||||
|
|
||||||
virtual void removeFromRemoteFS(const std::vector<String> & paths) = 0;
|
|
||||||
|
|
||||||
static AsynchronousReaderPtr getThreadPoolReader();
|
|
||||||
|
|
||||||
static ThreadPool & getThreadPoolWriter();
|
|
||||||
|
|
||||||
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
|
|
||||||
|
|
||||||
UInt32 getRefCount(const String & path) const override;
|
|
||||||
|
|
||||||
/// Return metadata for each file path. Also, before serialization reset
|
|
||||||
/// ref_count for each metadata to zero. This function used only for remote
|
|
||||||
/// fetches/sends in replicated engines. That's why we reset ref_count to zero.
|
|
||||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
|
||||||
protected:
|
|
||||||
Poco::Logger * log;
|
|
||||||
const String name;
|
|
||||||
const String remote_fs_root_path;
|
|
||||||
|
|
||||||
DiskPtr metadata_disk;
|
|
||||||
|
|
||||||
FileCachePtr cache;
|
|
||||||
|
|
||||||
private:
|
|
||||||
void removeMetadata(const String & path, std::vector<String> & paths_to_remove);
|
|
||||||
|
|
||||||
void removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
|
|
||||||
|
|
||||||
bool tryReserve(UInt64 bytes);
|
|
||||||
|
|
||||||
UInt64 reserved_bytes = 0;
|
|
||||||
UInt64 reservation_count = 0;
|
|
||||||
std::mutex reservation_mutex;
|
|
||||||
mutable std::shared_mutex metadata_mutex;
|
|
||||||
};
|
|
||||||
|
|
||||||
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
|
|
||||||
|
|
||||||
/// Remote FS (S3, HDFS) metadata file layout:
|
|
||||||
/// FS objects, their number and total size of all FS objects.
|
|
||||||
/// Each FS object represents a file path in remote FS and its size.
|
|
||||||
|
|
||||||
struct IDiskRemote::Metadata
|
|
||||||
{
|
|
||||||
using Updater = std::function<bool(IDiskRemote::Metadata & metadata)>;
|
|
||||||
/// Metadata file version.
|
|
||||||
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
|
|
||||||
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
|
|
||||||
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
|
|
||||||
|
|
||||||
/// Remote FS objects paths and their sizes.
|
|
||||||
std::vector<BlobPathWithSize> remote_fs_objects;
|
|
||||||
|
|
||||||
/// URI
|
|
||||||
const String & remote_fs_root_path;
|
|
||||||
|
|
||||||
/// Relative path to metadata file on local FS.
|
|
||||||
const String metadata_file_path;
|
|
||||||
|
|
||||||
DiskPtr metadata_disk;
|
|
||||||
|
|
||||||
/// Total size of all remote FS (S3, HDFS) objects.
|
|
||||||
size_t total_size = 0;
|
|
||||||
|
|
||||||
/// Number of references (hardlinks) to this metadata file.
|
|
||||||
///
|
|
||||||
/// FIXME: Why we are tracking it explicetly, without
|
|
||||||
/// info from filesystem????
|
|
||||||
UInt32 ref_count = 0;
|
|
||||||
|
|
||||||
/// Flag indicates that file is read only.
|
|
||||||
bool read_only = false;
|
|
||||||
|
|
||||||
Metadata(
|
|
||||||
const String & remote_fs_root_path_,
|
|
||||||
DiskPtr metadata_disk_,
|
|
||||||
const String & metadata_file_path_);
|
|
||||||
|
|
||||||
void addObject(const String & path, size_t size);
|
|
||||||
|
|
||||||
static Metadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
|
|
||||||
static Metadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
|
|
||||||
static Metadata readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
|
|
||||||
|
|
||||||
static Metadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
|
|
||||||
static Metadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
|
|
||||||
static Metadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
|
|
||||||
|
|
||||||
/// Serialize metadata to string (very same with saveToBuffer)
|
|
||||||
std::string serializeToString();
|
|
||||||
|
|
||||||
private:
|
|
||||||
/// Fsync metadata file if 'sync' flag is set.
|
|
||||||
void save(bool sync = false);
|
|
||||||
void saveToBuffer(WriteBuffer & buffer, bool sync);
|
|
||||||
void load();
|
|
||||||
};
|
|
||||||
|
|
||||||
class DiskRemoteReservation final : public IReservation
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_)
|
|
||||||
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
UInt64 getSize() const override { return size; }
|
|
||||||
|
|
||||||
DiskPtr getDisk(size_t i) const override;
|
|
||||||
|
|
||||||
Disks getDisks() const override { return {disk}; }
|
|
||||||
|
|
||||||
void update(UInt64 new_size) override;
|
|
||||||
|
|
||||||
~DiskRemoteReservation() override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
RemoteDiskPtr disk;
|
|
||||||
UInt64 size;
|
|
||||||
CurrentMetrics::Increment metric_increment;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/// Runs tasks asynchronously using thread pool.
|
|
||||||
class AsyncExecutor : public Executor
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
explicit AsyncExecutor(const String & name_, int thread_pool_size)
|
|
||||||
: name(name_)
|
|
||||||
, pool(ThreadPool(thread_pool_size)) {}
|
|
||||||
|
|
||||||
std::future<void> execute(std::function<void()> task) override
|
|
||||||
{
|
|
||||||
auto promise = std::make_shared<std::promise<void>>();
|
|
||||||
pool.scheduleOrThrowOnError(
|
|
||||||
[promise, task]()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
task();
|
|
||||||
promise->set_value();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException("Failed to run async task");
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
promise->set_exception(std::current_exception());
|
|
||||||
}
|
|
||||||
catch (...) {}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return promise->get_future();
|
|
||||||
}
|
|
||||||
|
|
||||||
void setMaxThreads(size_t threads)
|
|
||||||
{
|
|
||||||
pool.setMaxThreads(threads);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
String name;
|
|
||||||
ThreadPool pool;
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
@ -1,6 +1,5 @@
|
|||||||
#include "ReadBufferFromRemoteFSGather.h"
|
#include "ReadBufferFromRemoteFSGather.h"
|
||||||
|
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <IO/SeekableReadBuffer.h>
|
#include <IO/SeekableReadBuffer.h>
|
||||||
#include <Disks/IO/ReadBufferFromWebServer.h>
|
#include <Disks/IO/ReadBufferFromWebServer.h>
|
||||||
|
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Common/config.h>
|
#include <Common/config.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <IO/ReadBufferFromFile.h>
|
#include <IO/ReadBufferFromFile.h>
|
||||||
#include <IO/ReadSettings.h>
|
#include <IO/ReadSettings.h>
|
||||||
|
#include <Disks/IObjectStorage.h>
|
||||||
|
|
||||||
#if USE_AZURE_BLOB_STORAGE
|
#if USE_AZURE_BLOB_STORAGE
|
||||||
#include <azure/storage/blobs.hpp>
|
#include <azure/storage/blobs.hpp>
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
#include <Common/config.h>
|
#include <Common/config.h>
|
||||||
#include <IO/ReadBufferFromFile.h>
|
#include <IO/ReadBufferFromFile.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@
|
|||||||
#include <IO/SeekableReadBuffer.h>
|
#include <IO/SeekableReadBuffer.h>
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
#include <Common/config.h>
|
#include <Common/config.h>
|
||||||
|
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
#include <IO/WriteBufferFromFileDecorator.h>
|
#include <IO/WriteBufferFromFileDecorator.h>
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
#include <Storages/CompressionCodecSelector.h>
|
#include <Storages/CompressionCodecSelector.h>
|
||||||
#include <Storages/StorageS3Settings.h>
|
#include <Storages/StorageS3Settings.h>
|
||||||
#include <Disks/DiskLocal.h>
|
#include <Disks/DiskLocal.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
#include <Disks/IObjectStorage.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <Interpreters/ActionLocksManager.h>
|
#include <Interpreters/ActionLocksManager.h>
|
||||||
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
|
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
|
||||||
@ -313,7 +313,7 @@ struct ContextSharedPart
|
|||||||
/// since it may use per-user MemoryTracker which will be destroyed here.
|
/// since it may use per-user MemoryTracker which will be destroyed here.
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
IDiskRemote::getThreadPoolWriter().wait();
|
IObjectStorage::getThreadPoolWriter().wait();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
#include <Storages/MergeTree/DataPartsExchange.h>
|
#include <Storages/MergeTree/DataPartsExchange.h>
|
||||||
|
|
||||||
#include <Formats/NativeWriter.h>
|
#include <Formats/NativeWriter.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
#include <Disks/SingleDiskVolume.h>
|
#include <Disks/SingleDiskVolume.h>
|
||||||
#include <Disks/createVolume.h>
|
#include <Disks/createVolume.h>
|
||||||
#include <IO/HTTPCommon.h>
|
#include <IO/HTTPCommon.h>
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
#include <Storages/System/StorageSystemDisks.h>
|
#include <Storages/System/StorageSystemDisks.h>
|
||||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Disks/IDiskRemote.h>
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user