Merge master

This commit is contained in:
kssenii 2022-06-10 12:49:10 +02:00
commit 498f389c21
37 changed files with 1530 additions and 474 deletions

View File

@ -705,3 +705,109 @@ target_compile_options(_crypto PRIVATE -Wno-gnu-anonymous-struct)
add_library(OpenSSL::Crypto ALIAS _crypto)
add_library(OpenSSL::SSL ALIAS _ssl)
# Helper function used in the populate_openssl_vars function below
function(from_hex HEX DEC)
string(TOUPPER "${HEX}" HEX)
set(_res 0)
string(LENGTH "${HEX}" _strlen)
while (_strlen GREATER 0)
math(EXPR _res "${_res} * 16")
string(SUBSTRING "${HEX}" 0 1 NIBBLE)
string(SUBSTRING "${HEX}" 1 -1 HEX)
if (NIBBLE STREQUAL "A")
math(EXPR _res "${_res} + 10")
elseif (NIBBLE STREQUAL "B")
math(EXPR _res "${_res} + 11")
elseif (NIBBLE STREQUAL "C")
math(EXPR _res "${_res} + 12")
elseif (NIBBLE STREQUAL "D")
math(EXPR _res "${_res} + 13")
elseif (NIBBLE STREQUAL "E")
math(EXPR _res "${_res} + 14")
elseif (NIBBLE STREQUAL "F")
math(EXPR _res "${_res} + 15")
else ()
math(EXPR _res "${_res} + ${NIBBLE}")
endif ()
string(LENGTH "${HEX}" _strlen)
endwhile ()
set(${DEC} ${_res} PARENT_SCOPE)
endfunction()
# ClickHouse uses BoringSSL which is a fork of OpenSSL.
# This populates CMAKE var OPENSSL_VERSION from the OPENSSL_VERSION_NUMBER defined
# in contrib/boringssl/include/openssl/base.h. It also sets the CMAKE var OPENSSL_IS_BORING_SSL
# if it's defined in the file. Both OPENSSL_VERSION and OPENSSL_IS_BORING_SSL variables will be
# used to populate flags in the `system.build_options` table for more context on ssl version used.
# This cmake script is adopted from FindOpenSSL cmake module and slightly modified for this use-case .
if (EXISTS "${BORINGSSL_SOURCE_DIR}/include/openssl/base.h")
file(STRINGS "${BORINGSSL_SOURCE_DIR}/include/openssl/base.h" openssl_version_str
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])+.*")
file(STRINGS "${BORINGSSL_SOURCE_DIR}/include/openssl/base.h" openssl_is_boringssl
REGEX "^#[\t ]*define[\t ]+OPENSSL_IS_BORINGSSL.*")
# Set to true if OPENSSL_IS_BORING_SSL is defined
if (openssl_is_boringssl)
set(OPENSSL_IS_BORING_SSL 1)
endif ()
# If openssl_version_str is defined extrapolate and set OPENSSL_VERSION
if (openssl_version_str)
# The version number is encoded as 0xMNNFFPPS: major minor fix patch status
# The status gives if this is a developer or prerelease and is ignored here.
# Major, minor, and fix directly translate into the version numbers shown in
# the string. The patch field translates to the single character suffix that
# indicates the bug fix state, which 00 -> nothing, 01 -> a, 02 -> b and so
# on.
string(REGEX REPLACE "^.*OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F]).*$"
"\\1;\\2;\\3;\\4;\\5" OPENSSL_VERSION_LIST "${openssl_version_str}")
list(GET OPENSSL_VERSION_LIST 0 OPENSSL_VERSION_MAJOR)
list(GET OPENSSL_VERSION_LIST 1 OPENSSL_VERSION_MINOR)
from_hex("${OPENSSL_VERSION_MINOR}" OPENSSL_VERSION_MINOR)
list(GET OPENSSL_VERSION_LIST 2 OPENSSL_VERSION_FIX)
from_hex("${OPENSSL_VERSION_FIX}" OPENSSL_VERSION_FIX)
list(GET OPENSSL_VERSION_LIST 3 OPENSSL_VERSION_PATCH)
if (NOT OPENSSL_VERSION_PATCH STREQUAL "00")
from_hex("${OPENSSL_VERSION_PATCH}" _tmp)
# 96 is the ASCII code of 'a' minus 1
math(EXPR OPENSSL_VERSION_PATCH_ASCII "${_tmp} + 96")
unset(_tmp)
# Once anyone knows how OpenSSL would call the patch versions beyond 'z'
# this should be updated to handle that, too. This has not happened yet
# so it is simply ignored here for now.
string(ASCII "${OPENSSL_VERSION_PATCH_ASCII}" OPENSSL_VERSION_PATCH_STRING)
endif ()
set(OPENSSL_VERSION "${OPENSSL_VERSION_MAJOR}.${OPENSSL_VERSION_MINOR}.${OPENSSL_VERSION_FIX}${OPENSSL_VERSION_PATCH_STRING}")
else ()
# Since OpenSSL 3.0.0, the new version format is MAJOR.MINOR.PATCH and
# a new OPENSSL_VERSION_STR macro contains exactly that
file(STRINGS "${BORINGSSL_SOURCE_DIR}/include/openssl/base.h" OPENSSL_VERSION_STR
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_STR[\t ]+\"([0-9])+\\.([0-9])+\\.([0-9])+\".*")
string(REGEX REPLACE "^.*OPENSSL_VERSION_STR[\t ]+\"([0-9]+\\.[0-9]+\\.[0-9]+)\".*$"
"\\1" OPENSSL_VERSION_STR "${OPENSSL_VERSION_STR}")
set(OPENSSL_VERSION "${OPENSSL_VERSION_STR}")
# Setting OPENSSL_VERSION_MAJOR OPENSSL_VERSION_MINOR and OPENSSL_VERSION_FIX
string(REGEX MATCHALL "([0-9])+" OPENSSL_VERSION_NUMBER "${OPENSSL_VERSION}")
list(POP_FRONT OPENSSL_VERSION_NUMBER
OPENSSL_VERSION_MAJOR
OPENSSL_VERSION_MINOR
OPENSSL_VERSION_FIX)
unset(OPENSSL_VERSION_NUMBER)
unset(OPENSSL_VERSION_STR)
endif ()
endif ()
# Set CMAKE variables so that they can be referenced properly from everywhere
set(OPENSSL_VERSION "${OPENSSL_VERSION}" CACHE INTERNAL "")
set(OPENSSL_IS_BORING_SSL "${OPENSSL_IS_BORING_SSL}" CACHE INTERNAL 0)

View File

@ -630,6 +630,7 @@
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
M(660, HDFS_ERROR) \
M(661, CANNOT_SEND_SIGNAL) \
M(662, FS_METADATA_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -616,13 +616,18 @@ void LRUFileCache::remove(const Key & key)
for (auto & [offset, cell] : offsets)
to_remove.push_back(&cell);
bool some_cells_were_skipped = false;
for (auto & cell : to_remove)
{
/// In ordinary case we remove data from cache when it's not used by anyone.
/// But if we have multiple replicated zero-copy tables on the same server
/// it became possible to start removing something from cache when it is used
/// by other "zero-copy" tables. That is why it's not an error.
if (!cell->releasable())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot remove file from cache because someone reads from it. File segment info: {}",
cell->file_segment->getInfoForLog());
{
some_cells_were_skipped = true;
continue;
}
auto file_segment = cell->file_segment;
if (file_segment)
@ -634,10 +639,13 @@ void LRUFileCache::remove(const Key & key)
auto key_path = getPathInLocalCache(key);
files.erase(key);
if (!some_cells_were_skipped)
{
files.erase(key);
if (fs::exists(key_path))
fs::remove(key_path);
if (fs::exists(key_path))
fs::remove(key_path);
}
}
void LRUFileCache::remove()

View File

@ -0,0 +1,31 @@
#pragma once
#include <string>
#include <memory>
namespace DB
{
/**
* Iterator of directory contents
*/
class IDirectoryIterator
{
public:
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Path to the file that the iterator currently points to.
virtual std::string path() const = 0;
/// Name of the file that the iterator currently points to.
virtual std::string name() const = 0;
virtual ~IDirectoryIterator() = default;
};
using DirectoryIteratorPtr = std::unique_ptr<IDirectoryIterator>;
}

View File

@ -83,7 +83,7 @@ void DiskDecorator::moveDirectory(const String & from_path, const String & to_pa
delegate->moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path)
{
return delegate->iterateDirectory(path);
}

View File

@ -28,7 +28,7 @@ public:
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
@ -78,7 +78,7 @@ public:
std::vector<String> getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); }
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
DiskPtr getMetadataDiskIfExistsOrSelf() override { return delegate->getMetadataDiskIfExistsOrSelf(); }
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override { return delegate->getSerializedMetadata(file_paths); }

View File

@ -83,7 +83,7 @@ public:
delegate->moveDirectory(wrapped_from_path, wrapped_to_path);
}
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override
DirectoryIteratorPtr iterateDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->iterateDirectory(wrapped_path);

View File

@ -173,7 +173,7 @@ private:
};
class DiskLocalDirectoryIterator final : public IDiskDirectoryIterator
class DiskLocalDirectoryIterator final : public IDirectoryIterator
{
public:
DiskLocalDirectoryIterator() = default;
@ -325,7 +325,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
}
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{
fs::path meta_path = fs::path(disk_path) / path;
if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path))

View File

@ -58,7 +58,7 @@ public:
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
}
class DiskMemoryDirectoryIterator final : public IDiskDirectoryIterator
class DiskMemoryDirectoryIterator final : public IDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
@ -262,7 +262,7 @@ void DiskMemory::moveDirectory(const String & /*from_path*/, const String & /*to
throw Exception("Method moveDirectory is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
{
std::lock_guard lock(mutex);

View File

@ -52,7 +52,7 @@ public:
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;

View File

@ -171,7 +171,7 @@ void DiskRestartProxy::moveDirectory(const String & from_path, const String & to
DiskDecorator::moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::iterateDirectory(path);

View File

@ -37,7 +37,7 @@ public:
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;

View File

@ -95,7 +95,7 @@ void DiskWebServer::initialize(const String & uri_path) const
}
class DiskWebServerDirectoryIterator final : public IDiskDirectoryIterator
class DiskWebServerDirectoryIterator final : public IDirectoryIterator
{
public:
explicit DiskWebServerDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
@ -188,7 +188,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
}
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
{
std::vector<fs::path> dir_file_paths;
if (files.find(path) == files.end())

View File

@ -96,7 +96,7 @@ public:
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
DirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }

View File

@ -6,6 +6,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
namespace DB
{
@ -99,4 +100,6 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
MetadataStoragePtr IDisk::getMetadataStorage() { return std::make_shared<MetadataStorageFromDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), ""); }
}

View File

@ -12,6 +12,7 @@
#include <IO/WriteSettings.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/WriteMode.h>
#include <Disks/DirectoryIterator.h>
#include <memory>
#include <mutex>
@ -39,9 +40,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
using Reservations = std::vector<ReservationPtr>;
@ -49,6 +47,8 @@ using Reservations = std::vector<ReservationPtr>;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class MMappedFileCache;
class IMetadataStorage;
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
/**
@ -92,7 +92,10 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>()) : executor(std::move(executor_)) { }
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>())
: executor(std::move(executor_))
{
}
/// Root path for all files stored on the disk.
/// It's not required to be a local filesystem path.
@ -135,7 +138,7 @@ public:
virtual void moveDirectory(const String & from_path, const String & to_path) = 0;
/// Return iterator to the contents of the specified directory.
virtual DiskDirectoryIteratorPtr iterateDirectory(const String & path) = 0;
virtual DirectoryIteratorPtr iterateDirectory(const String & path) = 0;
/// Return `true` if the specified directory is empty.
bool isDirectoryEmpty(const String & path);
@ -321,7 +324,7 @@ public:
/// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual std::shared_ptr<IDisk> getMetadataDiskIfExistsOrSelf() { return std::static_pointer_cast<IDisk>(shared_from_this()); }
virtual MetadataStoragePtr getMetadataStorage();
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string.
@ -368,27 +371,6 @@ private:
using DiskPtr = std::shared_ptr<IDisk>;
using Disks = std::vector<DiskPtr>;
/**
* Iterator of directory contents on particular disk.
*/
class IDiskDirectoryIterator
{
public:
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Path to the file that the iterator currently points to.
virtual String path() const = 0;
/// Name of the file that the iterator currently points to.
virtual String name() const = 0;
virtual ~IDiskDirectoryIterator() = default;
};
/**
* Information about reserved size on particular disk.
*/

View File

@ -11,6 +11,7 @@
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
namespace DB
{
@ -82,11 +83,13 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, "");
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
name,
/* no namespaces */"",
"DiskAzureBlobStorage",
metadata_disk,
std::move(metadata_storage),
std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata,

View File

@ -14,7 +14,7 @@
#include <Common/filesystemHelpers.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Common/FileCache.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
@ -29,6 +29,7 @@ namespace ErrorCodes
extern const int BAD_FILE_TYPE;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int CANNOT_OPEN_FILE;
}
static String revisionToString(UInt64 revision)
@ -89,7 +90,7 @@ DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & remote_fs_root_path_,
const String & log_name,
DiskPtr metadata_disk_,
MetadataStoragePtr && metadata_storage_,
ObjectStoragePtr && object_storage_,
DiskType disk_type_,
bool send_metadata_,
@ -98,77 +99,22 @@ DiskObjectStorage::DiskObjectStorage(
, name(name_)
, remote_fs_root_path(remote_fs_root_path_)
, log (&Poco::Logger::get(log_name))
, metadata_disk(metadata_disk_)
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
, metadata_helper(std::make_unique<DiskObjectStorageMetadataHelper>(this, ReadSettings{}))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}
DiskObjectStorage::Metadata DiskObjectStorage::readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const
{
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
}
DiskObjectStorage::Metadata DiskObjectStorage::readMetadata(const String & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
DiskObjectStorage::Metadata DiskObjectStorage::readUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
void DiskObjectStorage::readUpdateStoreMetadataAndRemove(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
}
DiskObjectStorage::Metadata DiskObjectStorage::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
{
std::unique_lock lock(metadata_mutex);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
else
{
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
}
DiskObjectStorage::Metadata DiskObjectStorage::createAndStoreMetadata(const String & path, bool sync)
{
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
}
DiskObjectStorage::Metadata DiskObjectStorage::createUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
std::vector<String> DiskObjectStorage::getRemotePaths(const String & local_path) const
{
auto metadata = readMetadata(local_path);
std::vector<String> remote_paths;
for (const auto & [remote_path, _] : metadata.remote_fs_objects)
remote_paths.push_back(fs::path(metadata.remote_fs_root_path) / remote_path);
return remote_paths;
return metadata_storage->getRemotePaths(local_path);
}
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
{
/// Protect against concurrent delition of files (for example because of a merge).
if (metadata_disk->isFile(local_path))
if (metadata_storage->isFile(local_path))
{
try
{
@ -188,7 +134,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
}
else
{
DiskDirectoryIteratorPtr it;
DirectoryIteratorPtr it;
try
{
it = iterateDirectory(local_path);
@ -216,24 +162,26 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
bool DiskObjectStorage::exists(const String & path) const
{
return metadata_disk->exists(path);
return metadata_storage->exists(path);
}
bool DiskObjectStorage::isFile(const String & path) const
{
return metadata_disk->isFile(path);
return metadata_storage->isFile(path);
}
void DiskObjectStorage::createFile(const String & path)
{
createAndStoreMetadata(path, false);
auto tx = metadata_storage->createTransaction();
tx->createEmptyMetadataFile(path);
tx->commit();
}
size_t DiskObjectStorage::getFileSize(const String & path) const
{
return readMetadata(path).total_size;
return metadata_storage->getFileSize(path);
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path, bool should_send_metadata)
@ -241,6 +189,9 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
if (!exists(from_path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", to_path);
if (should_send_metadata)
{
auto revision = metadata_helper->revision_counter + 1;
@ -253,10 +204,9 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
metadata_helper->createFileOperationObject("rename", revision, object_metadata);
}
{
std::unique_lock lock(metadata_mutex);
metadata_disk->moveFile(from_path, to_path);
}
auto tx = metadata_storage->createTransaction();
tx->moveFile(from_path, to_path);
tx->commit();
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
@ -268,10 +218,13 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_
{
if (exists(to_path))
{
const String tmp_path = to_path + ".old";
moveFile(to_path, tmp_path);
moveFile(from_path, to_path);
removeFile(tmp_path);
auto blobs = metadata_storage->getRemotePaths(to_path);
auto tx = metadata_storage->createTransaction();
tx->replaceFile(from_path, to_path);
tx->commit();
removeFromRemoteFS(blobs);
}
else
moveFile(from_path, to_path);
@ -293,32 +246,21 @@ void DiskObjectStorage::removeFromRemoteFS(const std::vector<String> & paths)
UInt32 DiskObjectStorage::getRefCount(const String & path) const
{
return readMetadata(path).ref_count;
return metadata_storage->getHardlinkCount(path);
}
std::unordered_map<String, String> DiskObjectStorage::getSerializedMetadata(const std::vector<String> & file_paths) const
{
std::unordered_map<String, String> metadatas;
std::shared_lock lock(metadata_mutex);
for (const auto & path : file_paths)
{
DiskObjectStorage::Metadata metadata = readMetadataUnlocked(path, lock);
metadata.ref_count = 0;
metadatas[path] = metadata.serializeToString();
}
return metadatas;
return metadata_storage->getSerializedMetadata(file_paths);
}
String DiskObjectStorage::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
auto metadata = readMetadata(path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path;
auto blobs_paths = metadata_storage->getRemotePaths(path);
if (!blobs_paths.empty())
id = blobs_paths[0];
return id;
}
@ -337,8 +279,6 @@ bool DiskObjectStorage::checkUniqueId(const String & id) const
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path, bool should_send_metadata)
{
readUpdateAndStoreMetadata(src_path, false, [](Metadata & metadata) { metadata.ref_count++; return true; });
if (should_send_metadata && !dst_path.starts_with("shadow/"))
{
auto revision = metadata_helper->revision_counter + 1;
@ -351,7 +291,9 @@ void DiskObjectStorage::createHardLink(const String & src_path, const String & d
}
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
auto tx = metadata_storage->createTransaction();
tx->createHardLink(src_path, dst_path);
tx->commit();
}
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path)
@ -364,25 +306,31 @@ void DiskObjectStorage::setReadOnly(const String & path)
{
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
readUpdateAndStoreMetadata(path, false, [](Metadata & metadata) { metadata.read_only = true; return true; });
auto tx = metadata_storage->createTransaction();
tx->setReadOnly(path);
tx->commit();
}
bool DiskObjectStorage::isDirectory(const String & path) const
{
return metadata_disk->isDirectory(path);
return metadata_storage->isDirectory(path);
}
void DiskObjectStorage::createDirectory(const String & path)
{
metadata_disk->createDirectory(path);
auto tx = metadata_storage->createTransaction();
tx->createDirectory(path);
tx->commit();
}
void DiskObjectStorage::createDirectories(const String & path)
{
metadata_disk->createDirectories(path);
auto tx = metadata_storage->createTransaction();
tx->createDicrectoryRecursive(path);
tx->commit();
}
@ -396,13 +344,15 @@ void DiskObjectStorage::clearDirectory(const String & path)
void DiskObjectStorage::removeDirectory(const String & path)
{
metadata_disk->removeDirectory(path);
auto tx = metadata_storage->createTransaction();
tx->removeDirectory(path);
tx->commit();
}
DiskDirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path)
{
return metadata_disk->iterateDirectory(path);
return metadata_storage->iterateDirectory(path);
}
@ -415,13 +365,15 @@ void DiskObjectStorage::listFiles(const String & path, std::vector<String> & fil
void DiskObjectStorage::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
metadata_disk->setLastModified(path, timestamp);
auto tx = metadata_storage->createTransaction();
tx->setLastModified(path, timestamp);
tx->commit();
}
Poco::Timestamp DiskObjectStorage::getLastModified(const String & path)
{
return metadata_disk->getLastModified(path);
return metadata_storage->getLastModified(path);
}
Poco::Timestamp DiskObjectStorage::getLastChanged(const String & path)
@ -431,51 +383,45 @@ Poco::Timestamp DiskObjectStorage::getLastChanged(const String & path)
void DiskObjectStorage::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_storage->getPath() + path));
if (!metadata_disk->exists(path))
if (!metadata_storage->exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
if (!metadata_disk->isFile(path))
if (!metadata_storage->isFile(path))
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
try
{
auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata)
uint32_t hardlink_count = metadata_storage->getHardlinkCount(path);
auto remote_objects = metadata_storage->getRemotePaths(path);
auto tx = metadata_storage->createTransaction();
tx->unlinkMetadata(path);
tx->commit();
if (hardlink_count == 0)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
{
String object_path = fs::path(remote_fs_root_path) / remote_fs_object_path;
paths_to_remove.push_back(object_path);
object_storage->removeFromCache(object_path);
}
return false;
}
else /// In other case decrement number of references, save metadata and delete hardlink.
{
--metadata.ref_count;
}
return true;
};
readUpdateStoreMetadataAndRemove(path, false, metadata_updater);
/// If there is no references - delete content from remote FS.
paths_to_remove = remote_objects;
for (const auto & path_to_remove : paths_to_remove)
object_storage->removeFromCache(path_to_remove);
}
}
catch (const Exception & e)
{
/// If it's impossible to read meta - just remove it from FS.
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
if (e.code() == ErrorCodes::UNKNOWN_FORMAT
|| e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
|| e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::CANNOT_OPEN_FILE)
{
LOG_WARNING(log,
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
backQuote(path), e.nested() ? e.nested()->message() : e.message());
LOG_INFO(log, "Failed to read metadata file {} before removal because it's incomplete or empty. "
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", path);
std::unique_lock lock(metadata_mutex);
metadata_disk->removeFile(path);
auto tx = metadata_storage->createTransaction();
tx->unlinkFile(path);
tx->commit();
}
else
throw;
@ -487,7 +433,7 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
if (metadata_storage->isFile(path))
{
removeMetadata(path, paths_to_remove[path]);
}
@ -496,7 +442,9 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), paths_to_remove);
metadata_disk->removeDirectory(path);
auto tx = metadata_storage->createTransaction();
tx->removeDirectory(path);
tx->commit();
}
}
@ -530,7 +478,7 @@ ReservationPtr DiskObjectStorage::reserve(UInt64 bytes)
void DiskObjectStorage::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
if (metadata_disk->exists(path))
if (metadata_storage->exists(path))
{
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
@ -557,7 +505,7 @@ void DiskObjectStorage::removeSharedRecursive(const String & path, bool keep_all
}
}
std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
@ -589,8 +537,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto metadata = readMetadata(path);
return object_storage->readObjects(remote_fs_root_path, metadata.remote_fs_objects, settings, read_hint, file_size);
return object_storage->readObjects(remote_fs_root_path, metadata_storage->getBlobs(path), settings, read_hint, file_size);
}
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
@ -612,10 +559,15 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
auto create_metadata_callback = [this, mode, path, blob_name] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false,
[blob_name, count] (DiskObjectStorage::Metadata & metadata) { metadata.addObject(blob_name, count); return true; });
auto tx = metadata_storage->createTransaction();
if (mode == WriteMode::Rewrite)
tx->createMetadataFile(path, blob_name, count);
else
tx->addBlobToMetadata(path, blob_name, count);
tx->commit();
};
/// We always use mode Rewrite because we simulate append using metadata and different files
@ -641,7 +593,7 @@ void DiskObjectStorage::restoreMetadataIfNeeded(const Poco::Util::AbstractConfig
{
metadata_helper->restore(config, config_prefix, context);
if (metadata_helper->readSchemaVersion(object_storage.get(), remote_fs_root_path) < DiskObjectStorageMetadataHelper::RESTORABLE_SCHEMA_VERSION)
if (metadata_helper->readSchemaVersion(object_storage.get(), remote_fs_root_path) < DiskObjectStorageRemoteMetadataRestoreHelper::RESTORABLE_SCHEMA_VERSION)
metadata_helper->migrateToRestorableSchema();
metadata_helper->findLastRevision();

View File

@ -2,8 +2,8 @@
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <re2/re2.h>
namespace CurrentMetrics
@ -23,14 +23,14 @@ class DiskObjectStorage : public IDisk
{
friend class DiskObjectStorageReservation;
friend class DiskObjectStorageMetadataHelper;
friend class DiskObjectStorageRemoteMetadataRestoreHelper;
public:
DiskObjectStorage(
const String & name_,
const String & remote_fs_root_path_,
const String & log_name,
DiskPtr metadata_disk_,
MetadataStoragePtr && metadata_storage_,
ObjectStoragePtr && object_storage_,
DiskType disk_type_,
bool send_metadata_,
@ -42,12 +42,9 @@ public:
bool supportParallelWrite() const override { return true; }
using Metadata = DiskObjectStorageMetadata;
using MetadataUpdater = std::function<bool(Metadata & metadata)>;
const String & getName() const override { return name; }
const String & getPath() const override { return metadata_disk->getPath(); }
const String & getPath() const override { return metadata_storage->getPath(); }
std::vector<String> getRemotePaths(const String & local_path) const override;
@ -58,20 +55,6 @@ public:
return object_storage->getCacheBasePath();
}
/// Methods for working with metadata. For some operations (like hardlink
/// creation) metadata can be updated concurrently from multiple threads
/// (file actually rewritten on disk). So additional RW lock is required for
/// metadata read and write, but not for create new metadata.
Metadata readMetadata(const String & path) const;
Metadata readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const;
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
void readUpdateStoreMetadataAndRemove(const String & path, bool sync, MetadataUpdater updater);
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);
Metadata createAndStoreMetadata(const String & path, bool sync);
Metadata createUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
@ -108,7 +91,7 @@ public:
void removeFromRemoteFS(const std::vector<String> & paths);
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;
@ -141,7 +124,7 @@ public:
void removeDirectory(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
@ -182,25 +165,24 @@ private:
const String name;
const String remote_fs_root_path;
Poco::Logger * log;
DiskPtr metadata_disk;
const DiskType disk_type;
MetadataStoragePtr metadata_storage;
ObjectStoragePtr object_storage;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
mutable std::shared_mutex metadata_mutex;
void removeMetadata(const String & path, std::vector<String> & paths_to_remove);
void removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
std::optional<UInt64> tryReserve(UInt64 bytes);
bool send_metadata;
const bool send_metadata;
std::unique_ptr<DiskObjectStorageMetadataHelper> metadata_helper;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};
class DiskObjectStorageReservation final : public IReservation

View File

@ -1,5 +1,7 @@
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -9,168 +11,66 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int PATH_ACCESS_DENIED;
extern const int FILE_DOESNT_EXIST;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int CANNOT_OPEN_FILE;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.save(sync);
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
updater(result);
result.save(sync);
return result;
}
void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
/// Very often we are deleting metadata from some unfinished operation (like fetch of metadata)
/// in this case metadata file can be incomplete/empty and so on. It's ok to remove it in this case
/// because we cannot do anything better.
try
{
DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
metadata.load();
if (updater(metadata))
metadata.save(sync);
metadata_disk_->removeFile(metadata_file_path_);
}
catch (Exception & ex)
{
/// If we have some broken half-empty file just remove it
if (ex.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
|| ex.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| ex.code() == ErrorCodes::CANNOT_OPEN_FILE)
{
LOG_INFO(&Poco::Logger::get("ObjectStorageMetadata"), "Failed to read metadata file {} before removal because it's incomplete or empty. "
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", metadata_file_path_);
metadata_disk_->removeFile(metadata_file_path_);
}
/// If file already removed, than nothing to do
if (ex.code() == ErrorCodes::FILE_DOESNT_EXIST)
return;
throw;
}
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
{
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
{
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
}
else
{
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
if (result.read_only)
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
return result;
}
}
void DiskObjectStorageMetadata::load()
{
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, *buf);
readIntText(version, buf);
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
common_metadata_path + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', *buf);
assertChar('\n', buf);
UInt32 remote_fs_objects_count;
readIntText(remote_fs_objects_count, *buf);
assertChar('\t', *buf);
readIntText(total_size, *buf);
assertChar('\n', *buf);
readIntText(remote_fs_objects_count, buf);
assertChar('\t', buf);
readIntText(total_size, buf);
assertChar('\n', buf);
remote_fs_objects.resize(remote_fs_objects_count);
for (size_t i = 0; i < remote_fs_objects_count; ++i)
{
String remote_fs_object_path;
size_t remote_fs_object_size;
readIntText(remote_fs_object_size, *buf);
assertChar('\t', *buf);
readEscapedString(remote_fs_object_path, *buf);
readIntText(remote_fs_object_size, buf);
assertChar('\t', buf);
readEscapedString(remote_fs_object_path, buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
remote_fs_object_path, remote_fs_root_path, common_metadata_path);
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', *buf);
assertChar('\n', buf);
remote_fs_objects[i].relative_path = remote_fs_object_path;
remote_fs_objects[i].bytes_size = remote_fs_object_size;
}
readIntText(ref_count, *buf);
assertChar('\n', *buf);
readIntText(ref_count, buf);
assertChar('\n', buf);
if (version >= VERSION_READ_ONLY_FLAG)
{
readBoolText(read_only, *buf);
assertChar('\n', *buf);
readBoolText(read_only, buf);
assertChar('\n', buf);
}
}
/// Load metadata by path or create empty if `create` flag is set.
DiskObjectStorageMetadata::DiskObjectStorageMetadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_)
, metadata_file_path(metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
void DiskObjectStorageMetadata::deserializeFromString(const std::string & data)
{
ReadBufferFromString buf(data);
deserialize(buf);
}
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
{
total_size += size;
remote_fs_objects.emplace_back(path, size);
}
void DiskObjectStorageMetadata::saveToBuffer(WriteBuffer & buf, bool sync)
void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
{
writeIntText(VERSION_RELATIVE_PATHS, buf);
writeChar('\n', buf);
@ -197,21 +97,30 @@ void DiskObjectStorageMetadata::saveToBuffer(WriteBuffer & buf, bool sync)
buf.finalize();
if (sync)
buf.sync();
}
/// Fsync metadata file if 'sync' flag is set.
void DiskObjectStorageMetadata::save(bool sync)
std::string DiskObjectStorageMetadata::serializeToString() const
{
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
saveToBuffer(*buf, sync);
WriteBufferFromOwnString result;
serialize(result, false);
return result.str();
}
std::string DiskObjectStorageMetadata::serializeToString()
/// Load metadata by path or create empty if `create` flag is set.
DiskObjectStorageMetadata::DiskObjectStorageMetadata(
const std::string & common_metadata_path_,
const String & remote_fs_root_path_,
const String & metadata_file_path_)
: common_metadata_path(common_metadata_path_)
, remote_fs_root_path(remote_fs_root_path_)
, metadata_file_path(metadata_file_path_)
{
WriteBufferFromOwnString write_buf;
saveToBuffer(write_buf, false);
return write_buf.str();
}
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
{
total_size += size;
remote_fs_objects.emplace_back(path, size);
}

View File

@ -1,30 +1,32 @@
#pragma once
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Core/Types.h>
namespace DB
{
/// Metadata for DiskObjectStorage, stored on local disk
struct DiskObjectStorageMetadata
{
using Updater = std::function<bool(DiskObjectStorageMetadata & metadata)>;
private:
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1;
static constexpr uint32_t VERSION_RELATIVE_PATHS = 2;
static constexpr uint32_t VERSION_READ_ONLY_FLAG = 3;
const std::string & common_metadata_path;
/// Remote FS objects paths and their sizes.
std::vector<BlobPathWithSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;
const std::string & remote_fs_root_path;
/// Relative path to metadata file on local FS.
const String metadata_file_path;
DiskPtr metadata_disk;
const std::string metadata_file_path;
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
@ -33,36 +35,73 @@ struct DiskObjectStorageMetadata
///
/// FIXME: Why we are tracking it explicetly, without
/// info from filesystem????
UInt32 ref_count = 0;
uint32_t ref_count = 0;
/// Flag indicates that file is read only.
bool read_only = false;
public:
DiskObjectStorageMetadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_);
const std::string & common_metadata_path_,
const std::string & remote_fs_root_path_,
const std::string & metadata_file_path_);
void addObject(const String & path, size_t size);
void addObject(const std::string & path, size_t size);
static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);
static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
void serialize(WriteBuffer & buf, bool sync) const;
std::string serializeToString() const;
/// Serialize metadata to string (very same with saveToBuffer)
std::string serializeToString();
std::string getBlobsCommonPrefix() const
{
return remote_fs_root_path;
}
std::vector<BlobPathWithSize> getBlobs() const
{
return remote_fs_objects;
}
bool isReadOnly() const
{
return read_only;
}
uint32_t getRefCount() const
{
return ref_count;
}
uint64_t getTotalSizeBytes() const
{
return total_size;
}
void incrementRefCount()
{
++ref_count;
}
void decrementRefCount()
{
--ref_count;
}
void resetRefCount()
{
ref_count = 0;
}
void setReadOnly()
{
read_only = true;
}
private:
/// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false);
void saveToBuffer(WriteBuffer & buffer, bool sync);
void load();
};
using DiskObjectStorageMetadataUpdater = std::function<bool(DiskObjectStorageMetadata & metadata)>;
using DiskObjectStorageMetadataPtr = std::unique_ptr<DiskObjectStorageMetadata>;
}

View File

@ -1,4 +1,4 @@
#include <Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -21,7 +21,7 @@ static String revisionToString(UInt64 revision)
return std::bitset<64>(revision).to_string();
}
void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
void DiskObjectStorageRemoteMetadataRestoreHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
{
const String path = disk->remote_fs_root_path + "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
auto buf = disk->object_storage->writeObject(path, WriteMode::Rewrite, metadata);
@ -29,7 +29,7 @@ void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & o
buf->finalize();
}
void DiskObjectStorageMetadataHelper::findLastRevision()
void DiskObjectStorageRemoteMetadataRestoreHelper::findLastRevision()
{
/// Construct revision number from high to low bits.
String revision;
@ -51,7 +51,7 @@ void DiskObjectStorageMetadataHelper::findLastRevision()
LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name);
}
int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path)
int DiskObjectStorageRemoteMetadataRestoreHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path)
{
const std::string path = source_path + SCHEMA_VERSION_OBJECT;
int version = 0;
@ -64,7 +64,7 @@ int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_s
return version;
}
void DiskObjectStorageMetadataHelper::saveSchemaVersion(const int & version) const
void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & version) const
{
auto path = disk->remote_fs_root_path + SCHEMA_VERSION_OBJECT;
@ -74,18 +74,17 @@ void DiskObjectStorageMetadataHelper::saveSchemaVersion(const int & version) con
}
void DiskObjectStorageMetadataHelper::updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const
void DiskObjectStorageRemoteMetadataRestoreHelper::updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const
{
disk->object_storage->copyObject(key, key, metadata);
}
void DiskObjectStorageMetadataHelper::migrateFileToRestorableSchema(const String & path) const
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema(const String & path) const
{
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_disk->getPath() + path);
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
auto meta = disk->readMetadata(path);
for (const auto & [key, _] : meta.remote_fs_objects)
auto blobs = disk->metadata_storage->getBlobs(path);
for (const auto & [key, _] : blobs)
{
ObjectAttributes metadata {
{"path", path}
@ -93,11 +92,11 @@ void DiskObjectStorageMetadataHelper::migrateFileToRestorableSchema(const String
updateObjectMetadata(disk->remote_fs_root_path + key, metadata);
}
}
void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_disk->getPath() + path);
LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_storage->getPath() + path);
bool dir_contains_only_files = true;
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
@ -139,7 +138,7 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const S
}
void DiskObjectStorageMetadataHelper::migrateToRestorableSchema()
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchema()
{
try
{
@ -166,7 +165,7 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchema()
}
}
void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
LOG_INFO(disk->log, "Restore operation for disk {} called", disk->name);
@ -223,7 +222,9 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur
restoreFiles(source_object_storage, information);
restoreFileOperations(source_object_storage, information);
disk->metadata_disk->removeFile(RESTORE_FILE_NAME);
auto tx = disk->metadata_storage->createTransaction();
tx->unlinkFile(RESTORE_FILE_NAME);
tx->commit();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
@ -237,20 +238,20 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur
}
}
void DiskObjectStorageMetadataHelper::readRestoreInformation(RestoreInformation & restore_information) /// NOLINT
void DiskObjectStorageRemoteMetadataRestoreHelper::readRestoreInformation(RestoreInformation & restore_information) /// NOLINT
{
auto buffer = disk->metadata_disk->readFile(RESTORE_FILE_NAME, ReadSettings{}, 512);
buffer->next();
auto metadata_str = disk->metadata_storage->readFileToString(RESTORE_FILE_NAME);
ReadBufferFromString buffer(metadata_str);
try
{
std::map<String, String> properties;
while (buffer->hasPendingData())
while (buffer.hasPendingData())
{
String property;
readText(property, *buffer);
assertChar('\n', *buffer);
readText(property, buffer);
assertChar('\n', buffer);
auto pos = property.find('=');
if (pos == std::string::npos || pos == 0 || pos == property.length())
@ -310,7 +311,7 @@ static std::tuple<UInt64, String> extractRevisionAndOperationFromKey(const Strin
return {(revision_str.empty() ? 0 : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong())), operation};
}
void DiskObjectStorageMetadataHelper::moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata)
void DiskObjectStorageRemoteMetadataRestoreHelper::moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata)
{
if (disk->exists(to_path))
{
@ -339,7 +340,7 @@ void DiskObjectStorageMetadataHelper::moveRecursiveOrRemove(const String & from_
}
}
void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
{
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
@ -391,7 +392,7 @@ void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_objec
}
void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector<String> & keys) const
void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector<String> & keys) const
{
for (const auto & key : keys)
{
@ -422,13 +423,9 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != source_path)
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage);
auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata)
{
metadata.addObject(relative_key, meta.size_bytes);
return true;
};
disk->createUpdateAndStoreMetadata(path, false, updater);
auto tx = disk->metadata_storage->createTransaction();
tx->addBlobToMetadata(path, relative_key, meta.size_bytes);
tx->commit();
LOG_TRACE(disk->log, "Restored file {}", path);
}
@ -438,9 +435,11 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
void DiskObjectStorage::onFreeze(const String & path)
{
createDirectories(path);
auto revision_file_buf = metadata_disk->writeFile(path + "revision.txt", 32);
writeIntText(metadata_helper->revision_counter.load(), *revision_file_buf);
revision_file_buf->finalize();
auto tx = metadata_storage->createTransaction();
WriteBufferFromOwnString revision_file_buf ;
writeIntText(metadata_helper->revision_counter.load(), revision_file_buf);
tx->writeStringToFile(path + "revision.txt", revision_file_buf.str());
tx->commit();
}
static String pathToDetached(const String & source_path)
@ -450,7 +449,7 @@ static String pathToDetached(const String & source_path)
return fs::path(source_path).parent_path() / "detached/";
}
void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information)
{
/// Enable recording file operations if we restore to different bucket / path.
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path;
@ -531,6 +530,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
{
Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"};
auto tx = disk->metadata_storage->createTransaction();
for (const auto & path : renames)
{
/// Skip already detached parts.
@ -557,12 +557,13 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
to_path /= from_path.filename();
/// to_path may exist and non-empty in case for example abrupt restart, so remove it before rename
if (disk->metadata_disk->exists(to_path))
disk->metadata_disk->removeRecursive(to_path);
if (disk->metadata_storage->exists(to_path))
tx->removeRecursive(to_path);
disk->createDirectories(directoryPath(to_path));
disk->metadata_disk->moveDirectory(from_path, to_path);
tx->moveDirectory(from_path, to_path);
}
tx->commit();
}
LOG_INFO(disk->log, "File operations restored for disk {}", disk->name);

View File

@ -17,13 +17,13 @@ class DiskObjectStorage;
///
/// FIXME: this class is very intrusive and use a lot of DiskObjectStorage internals.
/// FIXME: it's very complex and unreliable, need to implement something better.
class DiskObjectStorageMetadataHelper
class DiskObjectStorageRemoteMetadataRestoreHelper
{
public:
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0;
DiskObjectStorageMetadataHelper(DiskObjectStorage * disk_, ReadSettings read_settings_)
DiskObjectStorageRemoteMetadataRestoreHelper(DiskObjectStorage * disk_, ReadSettings read_settings_)
: disk(disk_)
, read_settings(std::move(read_settings_))
, operation_log_suffix("-" + getFQDNOrHostName())

View File

@ -1,6 +1,7 @@
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/DiskFactory.h>
#include <Storages/HDFS/HDFSCommon.h>
@ -31,17 +32,20 @@ void registerDiskHDFS(DiskFactory & factory)
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context_->getSettingsRef().hdfs_replication
);
/// FIXME Cache currently unsupported :(
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(nullptr, uri, std::move(settings), config);
auto metadata_disk = prepareForLocalMetadata(name, config, config_prefix, context_).second;
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
return std::make_shared<DiskObjectStorage>(
name,
uri,
"DiskHDFS",
metadata_disk,
std::move(metadata_storage),
std::move(hdfs_storage),
DiskType::HDFS,
/* send_metadata = */ false,

View File

@ -0,0 +1,139 @@
#pragma once
#include <memory>
#include <vector>
#include <unordered_map>
#include <Poco/Timestamp.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/DirectoryIterator.h>
#include <Disks/WriteMode.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
namespace DB
{
struct IMetadataOperation
{
virtual void execute() = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
};
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
class IMetadataStorage;
/// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide
/// any snapshot isolation here, so no read operations in transactions
/// interface. This transaction is more like "batch operation" than real "transaction".
///
/// But for better usability we can get MetadataStorage interface and use some read methods.
struct IMetadataTransaction : private boost::noncopyable
{
public:
virtual void commit() = 0;
virtual const IMetadataStorage & getStorageForNonTransactionalReads() const = 0;
/// General purpose methods
/// Write metadata string to file
virtual void writeStringToFile(const std::string & path, const std::string & data) = 0;
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
virtual void setReadOnly(const std::string & path) = 0;
virtual void unlinkFile(const std::string & path) = 0;
virtual void createDirectory(const std::string & path) = 0;
virtual void createDicrectoryRecursive(const std::string & path) = 0;
virtual void removeDirectory(const std::string & path) = 0;
virtual void removeRecursive(const std::string & path) = 0;
virtual void createHardLink(const std::string & path_from, const std::string & path_to) = 0;
virtual void moveFile(const std::string & path_from, const std::string & path_to) = 0;
virtual void moveDirectory(const std::string & path_from, const std::string & path_to) = 0;
virtual void replaceFile(const std::string & path_from, const std::string & path_to) = 0;
/// Metadata related methods
/// Create empty file in metadata storage
virtual void createEmptyMetadataFile(const std::string & path) = 0;
/// Create metadata file on paths with content (blob_name, size_in_bytes)
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
/// Add to new blob to metadata file (way to implement appends)
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
/// Unlink metadata file and do something special if required
/// By default just remove file (unlink file).
virtual void unlinkMetadata(const std::string & path)
{
unlinkFile(path);
}
virtual ~IMetadataTransaction() = default;
};
using MetadataTransactionPtr = std::shared_ptr<IMetadataTransaction>;
/// Metadata storage for remote disks like DiskObjectStorage.
/// Support some subset of Disk operations, allow to read/write only
/// small amounts of data (strings).
class IMetadataStorage : private boost::noncopyable
{
public:
virtual MetadataTransactionPtr createTransaction() const = 0;
/// General purpose functions (similar to Disk)
virtual const std::string & getPath() const = 0;
virtual bool exists(const std::string & path) const = 0;
virtual bool isFile(const std::string & path) const = 0;
virtual bool isDirectory(const std::string & path) const = 0;
virtual uint64_t getFileSize(const std::string & path) const = 0;
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
virtual DirectoryIteratorPtr iterateDirectory(const std::string & path) = 0;
virtual uint32_t getHardlinkCount(const std::string & path) const = 0;
/// Read metadata file to string from path
virtual std::string readFileToString(const std::string & path) const = 0;
virtual ~IMetadataStorage() = default;
/// ==== More specefic methods. Previous were almost general purpose. ====
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
/// Return list of paths corresponding to metadata stored in local path
virtual std::vector<std::string> getRemotePaths(const std::string & path) const = 0;
/// Return [(remote_path, size_in_bytes), ...] for metadata path
virtual BlobsPathToSize getBlobs(const std::string & path) const = 0;
};
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
}

View File

@ -0,0 +1,667 @@
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <ranges>
#include <filesystem>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FS_METADATA_ERROR;
}
std::string toString(MetadataFromDiskTransactionState state)
{
switch (state)
{
case MetadataFromDiskTransactionState::PREPARING:
return "PREPARING";
case MetadataFromDiskTransactionState::FAILED:
return "FAILED";
case MetadataFromDiskTransactionState::COMMITTED:
return "COMMITTED";
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
return "PARTIALLY_ROLLED_BACK";
}
__builtin_unreachable();
}
namespace
{
std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString();
}
class SetLastModifiedOperation final : public IMetadataOperation
{
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
public:
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{}
void execute() override
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void undo() override
{
disk.setLastModified(path, old_timestamp);
}
};
class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string prev_data;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}
void undo() override
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
};
class CreateDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.createDirectory(path);
}
void undo() override
{
disk.removeDirectory(path);
}
};
class CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
public:
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
disk.createDirectory(path_to_create);
}
void undo() override
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
};
class RemoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{}
void execute() override
{
disk.removeDirectory(path);
}
void undo() override
{
disk.createDirectory(path);
}
};
class RemoveRecursiveOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_path;
public:
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_path(getTempFileName(fs::path(path).parent_path()))
{
}
void execute() override
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
else if (disk.isDirectory(path))
disk.moveDirectory(path, temp_path);
}
void undo() override
{
if (disk.isFile(temp_path))
disk.moveFile(temp_path, path);
else if (disk.isDirectory(temp_path))
disk.moveDirectory(temp_path, path);
}
void finalize() override
{
if (disk.exists(temp_path))
disk.removeRecursive(temp_path);
if (disk.exists(path))
disk.removeRecursive(path);
}
};
class CreateHardlinkOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.createHardLink(path_from, path_to);
}
void undo() override
{
disk.removeFile(path_to);
}
};
class MoveFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
}
};
class MoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveDirectory(path_from, path_to);
}
void undo() override
{
disk.moveDirectory(path_to, path_from);
}
};
class ReplaceFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
std::string temp_path_to;
public:
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
, temp_path_to(getTempFileName(fs::path(path_to).parent_path()))
{
}
void execute() override
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
disk.replaceFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
disk.moveFile(temp_path_to, path_to);
}
void finalize() override
{
disk.removeFileIfExists(temp_path_to);
}
};
class WriteFileOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
std::string data;
bool existed = false;
std::string prev_data;
public:
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_)
: path(path_)
, disk(disk_)
, data(data_)
{}
void execute() override
{
if (disk.exists(path))
{
existed = true;
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
}
auto buf = disk.writeFile(path);
writeString(data, *buf);
buf->finalize();
}
void undo() override
{
if (!existed)
disk.removeFileIfExists(path);
else
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
}
}
};
}
void MetadataStorageFromDiskTransaction::writeStringToFile( /// NOLINT
const std::string & path,
const std::string & data)
{
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.disk, data));
}
void MetadataStorageFromDiskTransaction::addOperation(MetadataOperationPtr && operation)
{
if (state != MetadataFromDiskTransactionState::PREPARING)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot add operations to transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
operations.emplace_back(std::move(operation));
}
void MetadataStorageFromDiskTransaction::commit()
{
if (state != MetadataFromDiskTransactionState::PREPARING)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot commit transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
{
std::unique_lock lock(metadata_storage.metadata_mutex);
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->execute();
}
catch (Exception & ex)
{
ex.addMessage(fmt::format("While committing operation #{}", i));
state = MetadataFromDiskTransactionState::FAILED;
rollback(i);
throw;
}
}
}
/// Do it in "best effort" mode
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i));
}
}
state = MetadataFromDiskTransactionState::COMMITTED;
}
void MetadataStorageFromDiskTransaction::rollback(size_t until_pos)
{
/// Otherwise everything is alright
if (state == MetadataFromDiskTransactionState::FAILED)
{
for (int64_t i = until_pos; i >= 0; --i)
{
try
{
operations[i]->undo();
}
catch (Exception & ex)
{
state = MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK;
ex.addMessage(fmt::format("While rolling back operation #{}", i));
throw;
}
}
}
else
{
/// Nothing to do, transaction committed or not even started to commit
}
}
const std::string & MetadataStorageFromDisk::getPath() const
{
return disk->getPath();
}
bool MetadataStorageFromDisk::exists(const std::string & path) const
{
return disk->exists(path);
}
bool MetadataStorageFromDisk::isFile(const std::string & path) const
{
return disk->isFile(path);
}
bool MetadataStorageFromDisk::isDirectory(const std::string & path) const
{
return disk->isDirectory(path);
}
Poco::Timestamp MetadataStorageFromDisk::getLastModified(const std::string & path) const
{
return disk->getLastModified(path);
}
uint64_t MetadataStorageFromDisk::getFileSize(const String & path) const
{
auto metadata = readMetadata(path);
return metadata->getTotalSizeBytes();
}
std::vector<std::string> MetadataStorageFromDisk::listDirectory(const std::string & path) const
{
std::vector<std::string> result_files;
disk->listFiles(path, result_files);
return result_files;
}
DirectoryIteratorPtr MetadataStorageFromDisk::iterateDirectory(const std::string & path)
{
return disk->iterateDirectory(path);
}
std::string MetadataStorageFromDisk::readFileToString(const std::string & path) const
{
auto buf = disk->readFile(path);
std::string result;
readStringUntilEOF(result, *buf);
return result;
}
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
{
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::unlinkFile(const std::string & path)
{
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::removeRecursive(const std::string & path)
{
addOperation(std::make_unique<RemoveRecursiveOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createDirectory(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createDicrectoryRecursive(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::removeDirectory(const std::string & path)
{
addOperation(std::make_unique<RemoveDirectoryOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createHardLink(const std::string & path_from, const std::string & path_to)
{
auto metadata = metadata_storage.readMetadata(path_from);
metadata->incrementRefCount();
writeStringToFile(path_from, metadata->serializeToString());
addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::moveFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::replaceFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::setReadOnly(const std::string & path)
{
auto metadata = metadata_storage.readMetadata(path);
metadata->setReadOnly();
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
{
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
metadata->addObject(blob_name, size_in_bytes);
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
{
DiskObjectStorageMetadataPtr metadata;
if (metadata_storage.exists(path))
{
metadata = metadata_storage.readMetadata(path);
}
else
{
metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
}
metadata->addObject(blob_name, size_in_bytes);
writeStringToFile(path, metadata->serializeToString());
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
auto str = readFileToString(path);
metadata->deserializeFromString(str);
return metadata;
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadata(const std::string & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
std::unordered_map<String, String> MetadataStorageFromDisk::getSerializedMetadata(const std::vector<String> & file_paths) const
{
std::shared_lock lock(metadata_mutex);
std::unordered_map<String, String> metadatas;
for (const auto & path : file_paths)
{
auto metadata = readMetadataUnlocked(path, lock);
metadata->resetRefCount();
WriteBufferFromOwnString buf;
metadata->serialize(buf, false);
metadatas[path] = buf.str();
}
return metadatas;
}
MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
{
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
}
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
{
auto metadata = readMetadata(path);
std::vector<std::string> remote_paths;
auto blobs = metadata->getBlobs();
auto root_path = metadata->getBlobsCommonPrefix();
remote_paths.reserve(blobs.size());
for (const auto & [remote_path, _] : blobs)
remote_paths.push_back(fs::path(root_path) / remote_path);
return remote_paths;
}
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
{
auto metadata = readMetadata(path);
return metadata->getRefCount();
}
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
{
auto metadata = readMetadata(path);
return metadata->getBlobs();
}
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
{
auto metadata = metadata_storage.readMetadata(path);
uint32_t ref_count = metadata->getRefCount();
if (ref_count != 0)
{
metadata->decrementRefCount();
writeStringToFile(path, metadata->serializeToString());
}
unlinkFile(path);
}
}

View File

@ -0,0 +1,130 @@
#pragma once
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
namespace DB
{
enum class MetadataFromDiskTransactionState
{
PREPARING,
FAILED,
COMMITTED,
PARTIALLY_ROLLED_BACK,
};
std::string toString(MetadataFromDiskTransactionState state);
class MetadataStorageFromDisk final : public IMetadataStorage
{
private:
friend struct MetadataStorageFromDiskTransaction;
DiskPtr disk;
std::string root_path_for_remote_metadata;
mutable std::shared_mutex metadata_mutex;
public:
MetadataStorageFromDisk(DiskPtr disk_, const std::string & root_path_from_remote_metadata_)
: disk(disk_)
, root_path_for_remote_metadata(root_path_from_remote_metadata_)
{
}
MetadataTransactionPtr createTransaction() const override;
const std::string & getPath() const override;
bool exists(const std::string & path) const override;
bool isFile(const std::string & path) const override;
bool isDirectory(const std::string & path) const override;
uint64_t getFileSize(const String & path) const override;
Poco::Timestamp getLastModified(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) override;
std::string readFileToString(const std::string & path) const override;
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
BlobsPathToSize getBlobs(const std::string & path) const override;
std::vector<std::string> getRemotePaths(const std::string & path) const override;
uint32_t getHardlinkCount(const std::string & path) const override;
private:
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> & lock) const;
};
struct MetadataStorageFromDiskTransaction final : public IMetadataTransaction
{
private:
const MetadataStorageFromDisk & metadata_storage;
std::vector<MetadataOperationPtr> operations;
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
void addOperation(MetadataOperationPtr && operation);
void rollback(size_t until_pos);
public:
explicit MetadataStorageFromDiskTransaction(const MetadataStorageFromDisk & metadata_storage_)
: metadata_storage(metadata_storage_)
{}
const IMetadataStorage & getStorageForNonTransactionalReads() const override
{
return metadata_storage;
}
void commit() override;
void writeStringToFile(const std::string & path, const std::string & data) override;
void createEmptyMetadataFile(const std::string & path) override;
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
void setReadOnly(const std::string & path) override;
void unlinkFile(const std::string & path) override;
void createDirectory(const std::string & path) override;
void createDicrectoryRecursive(const std::string & path) override;
void removeDirectory(const std::string & path) override;
void removeRecursive(const std::string & path) override;
void createHardLink(const std::string & path_from, const std::string & path_to) override;
void moveFile(const std::string & path_from, const std::string & path_to) override;
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
void replaceFile(const std::string & path_from, const std::string & path_to) override;
void unlinkMetadata(const std::string & path) override;
~MetadataStorageFromDiskTransaction() override = default;
};
}

View File

@ -24,6 +24,7 @@
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <IO/S3Common.h>
@ -85,6 +86,8 @@ void registerDiskS3(DiskFactory & factory)
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>(
@ -99,7 +102,7 @@ void registerDiskS3(DiskFactory & factory)
name,
uri.key,
"DiskS3",
metadata_disk,
std::move(metadata_storage),
std::move(s3_storage),
DiskType::S3,
send_metadata,

View File

@ -18,7 +18,7 @@ using DataPartsVector = std::vector<DataPartPtr>;
/// This object is responsible for tracking all changes that some transaction is making in MergeTree tables.
/// It collects all changes that queries of current transaction made in data part sets of all MergeTree tables
/// to ether make them visible when transaction commits or undo when transaction rolls back.
/// to either make them visible when transaction commits or undo when transaction rolls back.
class MergeTreeTransaction : public std::enable_shared_from_this<MergeTreeTransaction>, private boost::noncopyable
{
friend class TransactionLog;

View File

@ -1986,6 +1986,17 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
throw Exception{"Target path already exists: " + fullPath(disk, new_table_path), ErrorCodes::DIRECTORY_ALREADY_EXISTS};
}
{
/// Relies on storage path, so we drop it during rename
/// it will be recreated automatiaclly.
std::lock_guard wal_lock(write_ahead_log_mutex);
if (write_ahead_log)
{
write_ahead_log->shutdown();
write_ahead_log.reset();
}
}
for (const auto & disk : disks)
{
auto new_table_path_parent = parentPath(new_table_path);
@ -1997,7 +2008,10 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
getContext()->dropCaches();
relative_data_path = new_table_path;
renameInMemory(new_table_id);
}
void MergeTreeData::dropAllData()
@ -2013,6 +2027,12 @@ void MergeTreeData::dropAllData()
data_parts_indexes.clear();
column_sizes.clear();
{
std::lock_guard wal_lock(write_ahead_log_mutex);
if (write_ahead_log)
write_ahead_log->shutdown();
}
/// Tables in atomic databases have UUID and stored in persistent locations.
/// No need to drop caches (that are keyed by filesystem path) because collision is not possible.
if (!getStorageID().hasUUID())

View File

@ -17,7 +17,7 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
{
if (disk->exists(path_to_part))
{
for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
for (DirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
{
const auto & ext = fs::path(it->path()).extension();
if (ext == getNonAdaptiveMrkExtension()

View File

@ -46,9 +46,14 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
MergeTreeWriteAheadLog::~MergeTreeWriteAheadLog()
{
std::unique_lock lock(write_mutex);
if (sync_scheduled)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergeTreeWriteAheadLog::init()
@ -252,6 +257,25 @@ void MergeTreeWriteAheadLog::sync(std::unique_lock<std::mutex> & lock)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
}
void MergeTreeWriteAheadLog::shutdown()
{
{
std::unique_lock lock(write_mutex);
if (shutdown_called)
return;
if (sync_scheduled)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
shutdown_called = true;
out->finalize();
out.reset();
}
/// Do it without lock, otherwise inversion between pool lock and write_mutex is possible
sync_task->deactivate();
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>
MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
{

View File

@ -66,6 +66,7 @@ public:
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
void shutdown();
private:
void init();
@ -89,6 +90,7 @@ private:
size_t bytes_at_last_sync = 0;
bool sync_scheduled = false;
bool shutdown_called = false;
mutable std::mutex write_mutex;

View File

@ -10,6 +10,8 @@
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <base/sort.h>
#include <Storages/AlterCommands.h>
@ -1239,10 +1241,17 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
*/
DataParts unexpected_parts;
/// Intersection of local parts and expected parts
ActiveDataPartSet local_expected_parts_set(format_version);
/// Collect unexpected parts
for (const auto & part : parts)
if (!expected_parts.contains(part->name))
{
if (expected_parts.contains(part->name))
local_expected_parts_set.add(part->name);
else
unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
}
/// Which parts should be taken from other replicas.
Strings parts_to_fetch;
@ -1259,15 +1268,32 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
UInt64 unexpected_parts_nonnew_rows = 0;
UInt64 unexpected_parts_rows = 0;
Strings covered_unexpected_parts;
Strings uncovered_unexpected_parts;
UInt64 uncovered_unexpected_parts_rows = 0;
for (const auto & part : unexpected_parts)
{
unexpected_parts_rows += part->rows_count;
/// This part may be covered by some expected part that is active and present locally
/// Probably we just did not remove this part from disk before restart (but removed from ZooKeeper)
String covering_local_part = local_expected_parts_set.getContainingPart(part->name);
if (!covering_local_part.empty())
{
covered_unexpected_parts.push_back(part->name);
continue;
}
/// Part is unexpected and we don't have covering part: it's suspicious
uncovered_unexpected_parts.push_back(part->name);
uncovered_unexpected_parts_rows += part->rows_count;
if (part->info.level > 0)
{
++unexpected_parts_nonnew;
unexpected_parts_nonnew_rows += part->rows_count;
}
unexpected_parts_rows += part->rows_count;
}
const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0,
@ -1294,27 +1320,33 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
total_rows_on_filesystem += part->rows_count;
const auto storage_settings_ptr = getSettings();
bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
constexpr const char * sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
"{} rows of {} total rows in filesystem are suspicious. "
"There are {} unexpected parts with {} rows ({} of them is not just-written with {} rows), "
"{} missing parts (with {} blocks).";
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
constexpr const char * sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
if (insane && !skip_sanity_checks)
{
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, sanity_report_fmt, getStorageID().getNameForLogs(),
formatReadableQuantity(unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
unexpected_parts.size(), unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks);
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);
}
if (unexpected_parts_nonnew_rows > 0)
if (unexpected_parts_nonnew_rows > 0 || uncovered_unexpected_parts_rows > 0)
{
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
LOG_WARNING(log, fmt::runtime(sanity_report_fmt), getStorageID().getNameForLogs(),
formatReadableQuantity(unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
unexpected_parts.size(), unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks);
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);
}
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
@ -8144,56 +8176,68 @@ public:
void save(DiskPtr data_disk, const String & path) const
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto metadata_storage = data_disk->getMetadataStorage();
auto file_path = getFileName(path);
auto buffer = metadata_disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
writeIntText(version, *buffer);
buffer->write("\n", 1);
writeBoolText(is_replicated, *buffer);
buffer->write("\n", 1);
writeBoolText(is_remote, *buffer);
buffer->write("\n", 1);
writeString(replica_name, *buffer);
buffer->write("\n", 1);
writeString(zookeeper_name, *buffer);
buffer->write("\n", 1);
writeString(table_shared_id, *buffer);
buffer->write("\n", 1);
auto tx = metadata_storage->createTransaction();
WriteBufferFromOwnString buffer;
writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
writeString(table_shared_id, buffer);
buffer.write("\n", 1);
tx->writeStringToFile(file_path, buffer.str());
tx->commit();
}
bool load(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto metadata_storage = data_disk->getMetadataStorage();
auto file_path = getFileName(path);
if (!metadata_disk->exists(file_path))
if (!metadata_storage->exists(file_path))
return false;
auto buffer = metadata_disk->readFile(file_path, ReadSettings(), {});
readIntText(version, *buffer);
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
return false;
}
DB::assertChar('\n', *buffer);
readBoolText(is_replicated, *buffer);
DB::assertChar('\n', *buffer);
readBoolText(is_remote, *buffer);
DB::assertChar('\n', *buffer);
readString(replica_name, *buffer);
DB::assertChar('\n', *buffer);
readString(zookeeper_name, *buffer);
DB::assertChar('\n', *buffer);
readString(table_shared_id, *buffer);
DB::assertChar('\n', *buffer);
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
readString(table_shared_id, buffer);
DB::assertChar('\n', buffer);
return true;
}
static void clean(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
metadata_disk->removeFileIfExists(getFileName(path));
auto metadata_storage = data_disk->getMetadataStorage();
auto fname = getFileName(path);
if (metadata_storage->exists(fname))
{
auto tx = metadata_storage->createTransaction();
tx->unlinkFile(fname);
tx->commit();
}
}
private:

View File

@ -38,6 +38,8 @@ const char * auto_config_build[]
"USE_PROTOBUF", "@USE_PROTOBUF@",
"USE_BROTLI", "@USE_BROTLI@",
"USE_SSL", "@USE_SSL@",
"OPENSSL_VERSION", "@OPENSSL_VERSION@",
"OPENSSL_IS_BORING_SSL", "@OPENSSL_IS_BORING_SSL@",
"USE_HYPERSCAN", "@ENABLE_HYPERSCAN@",
"USE_SIMDJSON", "@USE_SIMDJSON@",
"USE_ODBC", "@USE_ODBC@",

View File

@ -79,12 +79,16 @@ def test_merge_doesnt_work_without_zookeeper(start_cluster):
== "2\n"
)
# Parts may be moved to Deleting state and then back in Outdated state.
# But system.parts returns only Active and Outdated parts if _state column is not queried.
with PartitionManager() as pm:
node1.query("OPTIMIZE TABLE test_table FINAL")
pm.drop_instance_zk_connections(node1)
# unfortunately we can be too fast and delete node before partition with ZK
if (
node1.query("SELECT count(*) from system.parts where table = 'test_table'")
node1.query(
"SELECT count(*) from system.parts where table = 'test_table' and _state!='dummy'"
)
== "1\n"
):
print("We were too fast and deleted parts before partition with ZK")
@ -92,7 +96,7 @@ def test_merge_doesnt_work_without_zookeeper(start_cluster):
time.sleep(10) # > old_parts_lifetime
assert (
node1.query(
"SELECT count(*) from system.parts where table = 'test_table'"
"SELECT count(*) from system.parts where table = 'test_table' and _state!='dummy'"
)
== "3\n"
)