mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
backward compatibility and implementation feature storage_metadata_write_full_object_key
This commit is contained in:
parent
a40cc397a6
commit
90b64bcdb9
@ -4804,3 +4804,10 @@ LIFETIME(MIN 0 MAX 3600)
|
|||||||
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
|
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
|
||||||
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
|
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## storage_metadata_write_full_object_key {#storage_metadata_write_full_object_key}
|
||||||
|
|
||||||
|
When set to `true` the metadata files are written with `VERSION_FULL_OBJECT_KEY` format version. With that format full object storage key names are written to the metadata files.
|
||||||
|
When set to `false` the metadata files are written with the previous format version, `VERSION_INLINE_DATA`. With that format only suffixes of object storage key names are are written to the metadata files. The prefix for all of object storage key names is set in configurations files at `storage_configuration.disks` section.
|
||||||
|
|
||||||
|
Default value: `false`.
|
||||||
|
68
src/Common/ObjectStorageKey.cpp
Normal file
68
src/Common/ObjectStorageKey.cpp
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
#include "ObjectStorageKey.h"
|
||||||
|
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
|
||||||
|
#include <filesystem>
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
const String & ObjectStorageKey::getPrefix() const
|
||||||
|
{
|
||||||
|
if (!is_relative)
|
||||||
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "object key has no prefix, key: {}", key);
|
||||||
|
|
||||||
|
return prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
const String & ObjectStorageKey::getSuffix() const
|
||||||
|
{
|
||||||
|
if (!is_relative)
|
||||||
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "object key has no suffix, key: {}", key);
|
||||||
|
return suffix;
|
||||||
|
}
|
||||||
|
|
||||||
|
const String & ObjectStorageKey::serialize() const
|
||||||
|
{
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectStorageKey ObjectStorageKey::createAsRelativeAnyway(String key_)
|
||||||
|
{
|
||||||
|
ObjectStorageKey object_key;
|
||||||
|
object_key.suffix = std::move(key_);
|
||||||
|
object_key.key = object_key.suffix;
|
||||||
|
object_key.is_relative = true;
|
||||||
|
return object_key;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectStorageKey ObjectStorageKey::createAsRelative(String prefix_, String suffix_)
|
||||||
|
{
|
||||||
|
ObjectStorageKey object_key;
|
||||||
|
object_key.prefix = std::move(prefix_);
|
||||||
|
object_key.suffix = std::move(suffix_);
|
||||||
|
|
||||||
|
if (object_key.prefix.empty())
|
||||||
|
object_key.key = object_key.suffix;
|
||||||
|
else
|
||||||
|
object_key.key = fs::path(object_key.prefix) / object_key.suffix;
|
||||||
|
|
||||||
|
object_key.is_relative = true;
|
||||||
|
return object_key;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectStorageKey ObjectStorageKey::createAsAbsolute(String key_)
|
||||||
|
{
|
||||||
|
ObjectStorageKey object_key;
|
||||||
|
object_key.key = std::move(key_);
|
||||||
|
object_key.is_relative = true;
|
||||||
|
return object_key;
|
||||||
|
}
|
||||||
|
}
|
29
src/Common/ObjectStorageKey.h
Normal file
29
src/Common/ObjectStorageKey.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <base/types.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
struct ObjectStorageKey
|
||||||
|
{
|
||||||
|
ObjectStorageKey() = default;
|
||||||
|
|
||||||
|
bool hasPrefix() const { return is_relative; }
|
||||||
|
const String & getPrefix() const;
|
||||||
|
const String & getSuffix() const;
|
||||||
|
const String & serialize() const;
|
||||||
|
|
||||||
|
static ObjectStorageKey createAsRelative(String prefix_, String suffix_);
|
||||||
|
static ObjectStorageKey createAsRelativeAnyway(String key_);
|
||||||
|
static ObjectStorageKey createAsAbsolute(String key_);
|
||||||
|
|
||||||
|
private:
|
||||||
|
String prefix;
|
||||||
|
String suffix;
|
||||||
|
String key;
|
||||||
|
bool is_relative = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -289,6 +289,7 @@ class IColumn;
|
|||||||
M(UInt64, http_response_buffer_size, 0, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
|
M(UInt64, http_response_buffer_size, 0, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
|
||||||
\
|
\
|
||||||
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
|
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
|
||||||
|
M(Bool, storage_metadata_write_full_object_key, false, "Enable write metadata files with VERSION_FULL_OBJECT_KEY format", 0) \
|
||||||
\
|
\
|
||||||
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
|
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
|
||||||
\
|
\
|
||||||
|
@ -302,12 +302,14 @@ public:
|
|||||||
struct LocalPathWithObjectStoragePaths
|
struct LocalPathWithObjectStoragePaths
|
||||||
{
|
{
|
||||||
std::string local_path;
|
std::string local_path;
|
||||||
std::string common_prefix_for_objects;
|
|
||||||
StoredObjects objects;
|
StoredObjects objects;
|
||||||
|
|
||||||
LocalPathWithObjectStoragePaths(
|
LocalPathWithObjectStoragePaths(
|
||||||
const std::string & local_path_, const std::string & common_prefix_for_objects_, StoredObjects && objects_)
|
const std::string & local_path_,
|
||||||
: local_path(local_path_), common_prefix_for_objects(common_prefix_for_objects_), objects(std::move(objects_)) {}
|
StoredObjects && objects_)
|
||||||
|
: local_path(local_path_)
|
||||||
|
, objects(std::move(objects_))
|
||||||
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
|
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
|
||||||
|
@ -102,9 +102,9 @@ AzureObjectStorage::AzureObjectStorage(
|
|||||||
data_source_description.is_encrypted = false;
|
data_source_description.is_encrypted = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */)
|
ObjectStorageKey AzureObjectStorage::generateObjectKeyForPath(const std::string & /* path */) const
|
||||||
{
|
{
|
||||||
return getRandomASCIIString(32);
|
return ObjectStorageKey::createAsRelativeAnyway(getRandomASCIIString(32));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AzureObjectStorage::exists(const StoredObject & object) const
|
bool AzureObjectStorage::exists(const StoredObject & object) const
|
||||||
@ -320,18 +320,7 @@ void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
|||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
for (const auto & object : objects)
|
for (const auto & object : objects)
|
||||||
{
|
{
|
||||||
try
|
removeObjectIfExists(object);
|
||||||
{
|
|
||||||
auto delete_info = client_ptr->DeleteBlob(object.remote_path);
|
|
||||||
}
|
|
||||||
catch (const Azure::Storage::StorageException & e)
|
|
||||||
{
|
|
||||||
/// If object doesn't exist...
|
|
||||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
|
|
||||||
return;
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ public:
|
|||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
ContextPtr context) override;
|
ContextPtr context) override;
|
||||||
|
|
||||||
std::string generateBlobNameForPath(const std::string & path) override;
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||||
|
|
||||||
bool isRemote() const override { return true; }
|
bool isRemote() const override { return true; }
|
||||||
|
|
||||||
|
@ -31,11 +31,12 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
|
|||||||
getAzureBlobContainerClient(config, config_prefix),
|
getAzureBlobContainerClient(config, config_prefix),
|
||||||
getAzureBlobStorageSettings(config, config_prefix, context));
|
getAzureBlobStorageSettings(config, config_prefix, context));
|
||||||
|
|
||||||
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, "");
|
String key_prefix;
|
||||||
|
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, key_prefix);
|
||||||
|
|
||||||
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
|
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
|
||||||
name,
|
name,
|
||||||
/* no namespaces */"",
|
/* no namespaces */ key_prefix,
|
||||||
"DiskAzureBlobStorage",
|
"DiskAzureBlobStorage",
|
||||||
std::move(metadata_storage),
|
std::move(metadata_storage),
|
||||||
std::move(azure_object_storage),
|
std::move(azure_object_storage),
|
||||||
|
@ -42,9 +42,9 @@ FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
|
|||||||
return cache->createKeyForPath(path);
|
return cache->createKeyForPath(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string CachedObjectStorage::generateBlobNameForPath(const std::string & path)
|
ObjectStorageKey CachedObjectStorage::generateObjectKeyForPath(const std::string & path) const
|
||||||
{
|
{
|
||||||
return object_storage->generateBlobNameForPath(path);
|
return object_storage->generateObjectKeyForPath(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
|
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
|
||||||
|
@ -92,7 +92,7 @@ public:
|
|||||||
|
|
||||||
const std::string & getCacheName() const override { return cache_config_name; }
|
const std::string & getCacheName() const override { return cache_config_name; }
|
||||||
|
|
||||||
std::string generateBlobNameForPath(const std::string & path) override;
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||||
|
|
||||||
bool isRemote() const override { return object_storage->isRemote(); }
|
bool isRemote() const override { return object_storage->isRemote(); }
|
||||||
|
|
||||||
|
@ -48,14 +48,14 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
|
|||||||
|
|
||||||
DiskObjectStorage::DiskObjectStorage(
|
DiskObjectStorage::DiskObjectStorage(
|
||||||
const String & name_,
|
const String & name_,
|
||||||
const String & object_storage_root_path_,
|
const String & object_key_prefix_,
|
||||||
const String & log_name,
|
const String & log_name,
|
||||||
MetadataStoragePtr metadata_storage_,
|
MetadataStoragePtr metadata_storage_,
|
||||||
ObjectStoragePtr object_storage_,
|
ObjectStoragePtr object_storage_,
|
||||||
const Poco::Util::AbstractConfiguration & config,
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
const String & config_prefix)
|
const String & config_prefix)
|
||||||
: IDisk(name_, config, config_prefix)
|
: IDisk(name_, config, config_prefix)
|
||||||
, object_storage_root_path(object_storage_root_path_)
|
, object_key_prefix(object_key_prefix_)
|
||||||
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
|
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
|
||||||
, metadata_storage(std::move(metadata_storage_))
|
, metadata_storage(std::move(metadata_storage_))
|
||||||
, object_storage(std::move(object_storage_))
|
, object_storage(std::move(object_storage_))
|
||||||
@ -80,7 +80,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
paths_map.emplace_back(local_path, metadata_storage->getObjectStorageRootPath(), getStorageObjects(local_path));
|
paths_map.emplace_back(local_path, getStorageObjects(local_path));
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
@ -243,9 +243,9 @@ String DiskObjectStorage::getUniqueId(const String & path) const
|
|||||||
|
|
||||||
bool DiskObjectStorage::checkUniqueId(const String & id) const
|
bool DiskObjectStorage::checkUniqueId(const String & id) const
|
||||||
{
|
{
|
||||||
if (!id.starts_with(object_storage_root_path))
|
if (!id.starts_with(object_key_prefix))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Blob with id {} doesn't start with blob storage prefix {}, Stack {}", id, object_storage_root_path, StackTrace().toString());
|
LOG_DEBUG(log, "Blob with id {} doesn't start with blob storage prefix {}, Stack {}", id, object_key_prefix, StackTrace().toString());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -470,7 +470,7 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
|
|||||||
const auto config_prefix = "storage_configuration.disks." + name;
|
const auto config_prefix = "storage_configuration.disks." + name;
|
||||||
return std::make_shared<DiskObjectStorage>(
|
return std::make_shared<DiskObjectStorage>(
|
||||||
getName(),
|
getName(),
|
||||||
object_storage_root_path,
|
object_key_prefix,
|
||||||
getName(),
|
getName(),
|
||||||
metadata_storage,
|
metadata_storage,
|
||||||
object_storage,
|
object_storage,
|
||||||
@ -586,7 +586,7 @@ void DiskObjectStorage::restoreMetadataIfNeeded(
|
|||||||
{
|
{
|
||||||
metadata_helper->restore(config, config_prefix, context);
|
metadata_helper->restore(config, config_prefix, context);
|
||||||
|
|
||||||
auto current_schema_version = metadata_helper->readSchemaVersion(object_storage.get(), object_storage_root_path);
|
auto current_schema_version = metadata_helper->readSchemaVersion(object_storage.get(), object_key_prefix);
|
||||||
if (current_schema_version < DiskObjectStorageRemoteMetadataRestoreHelper::RESTORABLE_SCHEMA_VERSION)
|
if (current_schema_version < DiskObjectStorageRemoteMetadataRestoreHelper::RESTORABLE_SCHEMA_VERSION)
|
||||||
metadata_helper->migrateToRestorableSchema();
|
metadata_helper->migrateToRestorableSchema();
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
|
|||||||
public:
|
public:
|
||||||
DiskObjectStorage(
|
DiskObjectStorage(
|
||||||
const String & name_,
|
const String & name_,
|
||||||
const String & object_storage_root_path_,
|
const String & object_key_prefix_,
|
||||||
const String & log_name,
|
const String & log_name,
|
||||||
MetadataStoragePtr metadata_storage_,
|
MetadataStoragePtr metadata_storage_,
|
||||||
ObjectStoragePtr object_storage_,
|
ObjectStoragePtr object_storage_,
|
||||||
@ -224,7 +224,7 @@ private:
|
|||||||
String getReadResourceName() const;
|
String getReadResourceName() const;
|
||||||
String getWriteResourceName() const;
|
String getWriteResourceName() const;
|
||||||
|
|
||||||
const String object_storage_root_path;
|
const String object_key_prefix;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
MetadataStoragePtr metadata_storage;
|
MetadataStoragePtr metadata_storage;
|
||||||
|
@ -7,6 +7,8 @@
|
|||||||
#include <IO/WriteBufferFromFileBase.h>
|
#include <IO/WriteBufferFromFileBase.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -17,44 +19,57 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
||||||
{
|
{
|
||||||
UInt32 version;
|
|
||||||
readIntText(version, buf);
|
readIntText(version, buf);
|
||||||
|
|
||||||
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_INLINE_DATA)
|
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_FULL_OBJECT_KEY)
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::UNKNOWN_FORMAT,
|
ErrorCodes::UNKNOWN_FORMAT,
|
||||||
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
|
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
|
||||||
common_metadata_path + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
|
metadata_file_path, toString(version), toString(VERSION_FULL_OBJECT_KEY));
|
||||||
|
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
|
|
||||||
UInt32 storage_objects_count;
|
UInt32 keys_count;
|
||||||
readIntText(storage_objects_count, buf);
|
readIntText(keys_count, buf);
|
||||||
assertChar('\t', buf);
|
assertChar('\t', buf);
|
||||||
|
keys_with_meta.resize(keys_count);
|
||||||
|
|
||||||
readIntText(total_size, buf);
|
readIntText(total_size, buf);
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
storage_objects.resize(storage_objects_count);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < storage_objects_count; ++i)
|
for (UInt32 i = 0; i < keys_count; ++i)
|
||||||
{
|
{
|
||||||
String object_relative_path;
|
UInt64 object_size;
|
||||||
size_t object_size;
|
|
||||||
readIntText(object_size, buf);
|
readIntText(object_size, buf);
|
||||||
assertChar('\t', buf);
|
assertChar('\t', buf);
|
||||||
readEscapedString(object_relative_path, buf);
|
|
||||||
if (version == VERSION_ABSOLUTE_PATHS)
|
|
||||||
{
|
|
||||||
if (!object_relative_path.starts_with(object_storage_root_path))
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
|
|
||||||
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
|
|
||||||
object_relative_path, object_storage_root_path, common_metadata_path);
|
|
||||||
|
|
||||||
object_relative_path = object_relative_path.substr(object_storage_root_path.size());
|
keys_with_meta[i].metadata.size_bytes = object_size;
|
||||||
}
|
|
||||||
|
String key_value;
|
||||||
|
readEscapedString(key_value, buf);
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
|
|
||||||
storage_objects[i].relative_path = object_relative_path;
|
if (version == VERSION_ABSOLUTE_PATHS)
|
||||||
storage_objects[i].metadata.size_bytes = object_size;
|
{
|
||||||
|
if (!key_value.starts_with(compatible_key_prefix))
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::UNKNOWN_FORMAT,
|
||||||
|
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
|
||||||
|
key_value,
|
||||||
|
compatible_key_prefix,
|
||||||
|
metadata_file_path);
|
||||||
|
|
||||||
|
keys_with_meta[i].key = ObjectStorageKey::createAsRelative(
|
||||||
|
compatible_key_prefix, key_value.substr(compatible_key_prefix.size()));
|
||||||
|
}
|
||||||
|
else if (version < VERSION_FULL_OBJECT_KEY)
|
||||||
|
{
|
||||||
|
keys_with_meta[i].key = ObjectStorageKey::createAsRelative(compatible_key_prefix, key_value);
|
||||||
|
}
|
||||||
|
else if (version >= VERSION_FULL_OBJECT_KEY)
|
||||||
|
{
|
||||||
|
keys_with_meta[i].key = ObjectStorageKey::createAsAbsolute(key_value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
readIntText(ref_count, buf);
|
readIntText(ref_count, buf);
|
||||||
@ -73,7 +88,7 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadata::deserializeFromString(const std::string & data)
|
void DiskObjectStorageMetadata::deserializeFromString(const String & data)
|
||||||
{
|
{
|
||||||
ReadBufferFromString buf(data);
|
ReadBufferFromString buf(data);
|
||||||
deserialize(buf);
|
deserialize(buf);
|
||||||
@ -81,22 +96,56 @@ void DiskObjectStorageMetadata::deserializeFromString(const std::string & data)
|
|||||||
|
|
||||||
void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
||||||
{
|
{
|
||||||
writeIntText(VERSION_INLINE_DATA, buf);
|
/// There are the changes for backward compatibility
|
||||||
|
/// No new file should be write as VERSION_FULL_OBJECT_KEY until storage_metadata_write_full_object_key feature is enabled
|
||||||
|
/// However, in case of rollback, once file had been written as VERSION_FULL_OBJECT_KEY
|
||||||
|
/// it has to be always rewritten as VERSION_FULL_OBJECT_KEY
|
||||||
|
|
||||||
|
bool storage_metadata_write_full_object_key = getWriteFullObjectKeySetting();
|
||||||
|
|
||||||
|
if (version == VERSION_FULL_OBJECT_KEY && !storage_metadata_write_full_object_key)
|
||||||
|
{
|
||||||
|
Poco::Logger * logger = &Poco::Logger::get("DiskObjectStorageMetadata");
|
||||||
|
LOG_WARNING(
|
||||||
|
logger,
|
||||||
|
"Metadata file {} is written with VERSION_FULL_OBJECT_KEY version"
|
||||||
|
"However storage_metadata_write_full_object_key is off.",
|
||||||
|
metadata_file_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt32 write_version = version;
|
||||||
|
if (storage_metadata_write_full_object_key)
|
||||||
|
write_version = VERSION_FULL_OBJECT_KEY;
|
||||||
|
|
||||||
|
chassert(write_version >= VERSION_ABSOLUTE_PATHS && write_version <= VERSION_FULL_OBJECT_KEY);
|
||||||
|
writeIntText(write_version, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
writeIntText(storage_objects.size(), buf);
|
writeIntText(keys_with_meta.size(), buf);
|
||||||
writeChar('\t', buf);
|
writeChar('\t', buf);
|
||||||
writeIntText(total_size, buf);
|
writeIntText(total_size, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
for (const auto & [object_relative_path, object_metadata] : storage_objects)
|
for (const auto & [object_key, object_meta] : keys_with_meta)
|
||||||
{
|
{
|
||||||
writeIntText(object_metadata.size_bytes, buf);
|
writeIntText(object_meta.size_bytes, buf);
|
||||||
writeChar('\t', buf);
|
writeChar('\t', buf);
|
||||||
writeEscapedString(object_relative_path, buf);
|
|
||||||
|
if (write_version == VERSION_FULL_OBJECT_KEY)
|
||||||
|
{
|
||||||
|
/// if the metadata file has VERSION_FULL_OBJECT_KEY version
|
||||||
|
/// all keys inside are written as absolute paths
|
||||||
|
writeEscapedString(object_key.serialize(), buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// otherwise keys are written as relative paths
|
||||||
|
/// therefore keys have to have suffix and prefix
|
||||||
|
writeEscapedString(object_key.getSuffix(), buf);
|
||||||
|
writeChar('\n', buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
writeIntText(ref_count, buf);
|
writeIntText(ref_count, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
@ -104,11 +153,6 @@ void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
|||||||
writeBoolText(read_only, buf);
|
writeBoolText(read_only, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
/// Metadata version describes the format of the file
|
|
||||||
/// It determines the possibility of writing and reading a particular set of fields from the file, no matter the fields' values.
|
|
||||||
/// It should not be dependent on field values.
|
|
||||||
/// We always write inline_data in the file when we declare VERSION_INLINE_DATA as a file version,
|
|
||||||
/// unless it is impossible to introduce the next version of the format.
|
|
||||||
writeEscapedString(inline_data, buf);
|
writeEscapedString(inline_data, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
@ -117,7 +161,7 @@ void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
|||||||
buf.sync();
|
buf.sync();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string DiskObjectStorageMetadata::serializeToString() const
|
String DiskObjectStorageMetadata::serializeToString() const
|
||||||
{
|
{
|
||||||
WriteBufferFromOwnString result;
|
WriteBufferFromOwnString result;
|
||||||
serialize(result, false);
|
serialize(result, false);
|
||||||
@ -126,20 +170,44 @@ std::string DiskObjectStorageMetadata::serializeToString() const
|
|||||||
|
|
||||||
/// Load metadata by path or create empty if `create` flag is set.
|
/// Load metadata by path or create empty if `create` flag is set.
|
||||||
DiskObjectStorageMetadata::DiskObjectStorageMetadata(
|
DiskObjectStorageMetadata::DiskObjectStorageMetadata(
|
||||||
const std::string & common_metadata_path_,
|
String compatible_key_prefix_,
|
||||||
const String & object_storage_root_path_,
|
String metadata_file_path_)
|
||||||
const String & metadata_file_path_)
|
: compatible_key_prefix(std::move(compatible_key_prefix_))
|
||||||
: common_metadata_path(common_metadata_path_)
|
, metadata_file_path(std::move(metadata_file_path_))
|
||||||
, object_storage_root_path(object_storage_root_path_)
|
|
||||||
, metadata_file_path(metadata_file_path_)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
|
void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
|
||||||
{
|
{
|
||||||
|
if (!key.hasPrefix())
|
||||||
|
{
|
||||||
|
version = VERSION_FULL_OBJECT_KEY;
|
||||||
|
|
||||||
|
bool storage_metadata_write_full_object_key = getWriteFullObjectKeySetting();
|
||||||
|
if (!storage_metadata_write_full_object_key)
|
||||||
|
{
|
||||||
|
Poco::Logger * logger = &Poco::Logger::get("DiskObjectStorageMetadata");
|
||||||
|
LOG_WARNING(
|
||||||
|
logger,
|
||||||
|
"Metadata file {} has at least one key {} without fixed common key prefix."
|
||||||
|
"That forces using VERSION_FULL_OBJECT_KEY version for that metadata file."
|
||||||
|
"However storage_metadata_write_full_object_key is off.",
|
||||||
|
metadata_file_path,
|
||||||
|
key.serialize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
total_size += size;
|
total_size += size;
|
||||||
storage_objects.emplace_back(path, ObjectMetadata{size, {}, {}});
|
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
|
||||||
|
{
|
||||||
|
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
|
||||||
|
return Context::getGlobalContextInstance()->getSettings().storage_metadata_write_full_object_key;
|
||||||
|
#else
|
||||||
|
return false;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -13,29 +13,30 @@ struct DiskObjectStorageMetadata
|
|||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
/// Metadata file version.
|
/// Metadata file version.
|
||||||
static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1;
|
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
|
||||||
static constexpr uint32_t VERSION_RELATIVE_PATHS = 2;
|
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
|
||||||
static constexpr uint32_t VERSION_READ_ONLY_FLAG = 3;
|
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
|
||||||
static constexpr uint32_t VERSION_INLINE_DATA = 4;
|
static constexpr UInt32 VERSION_INLINE_DATA = 4;
|
||||||
|
static constexpr UInt32 VERSION_FULL_OBJECT_KEY = 5; /// only for reading data
|
||||||
|
|
||||||
const std::string & common_metadata_path;
|
UInt32 version = VERSION_INLINE_DATA;
|
||||||
|
|
||||||
/// Relative paths of blobs.
|
/// Absolute paths of blobs
|
||||||
RelativePathsWithMetadata storage_objects;
|
ObjectKeysWithMetadata keys_with_meta;
|
||||||
|
|
||||||
const std::string object_storage_root_path;
|
const std::string compatible_key_prefix;
|
||||||
|
|
||||||
/// Relative path to metadata file on local FS.
|
/// Relative path to metadata file on local FS.
|
||||||
const std::string metadata_file_path;
|
const std::string metadata_file_path;
|
||||||
|
|
||||||
/// Total size of all remote FS (S3, HDFS) objects.
|
/// Total size of all remote FS (S3, HDFS) objects.
|
||||||
size_t total_size = 0;
|
UInt64 total_size = 0;
|
||||||
|
|
||||||
/// Number of references (hardlinks) to this metadata file.
|
/// Number of references (hardlinks) to this metadata file.
|
||||||
///
|
///
|
||||||
/// FIXME: Why we are tracking it explicitly, without
|
/// FIXME: Why we are tracking it explicitly, without
|
||||||
/// info from filesystem????
|
/// info from filesystem????
|
||||||
uint32_t ref_count = 0;
|
UInt32 ref_count = 0;
|
||||||
|
|
||||||
/// Flag indicates that file is read only.
|
/// Flag indicates that file is read only.
|
||||||
bool read_only = false;
|
bool read_only = false;
|
||||||
@ -46,11 +47,11 @@ private:
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
DiskObjectStorageMetadata(
|
DiskObjectStorageMetadata(
|
||||||
const std::string & common_metadata_path_,
|
String compatible_key_prefix_,
|
||||||
const std::string & object_storage_root_path_,
|
String metadata_file_path_);
|
||||||
const std::string & metadata_file_path_);
|
|
||||||
|
void addObject(ObjectStorageKey key, size_t size);
|
||||||
|
|
||||||
void addObject(const std::string & path, size_t size);
|
|
||||||
|
|
||||||
void deserialize(ReadBuffer & buf);
|
void deserialize(ReadBuffer & buf);
|
||||||
void deserializeFromString(const std::string & data);
|
void deserializeFromString(const std::string & data);
|
||||||
@ -58,14 +59,9 @@ public:
|
|||||||
void serialize(WriteBuffer & buf, bool sync) const;
|
void serialize(WriteBuffer & buf, bool sync) const;
|
||||||
std::string serializeToString() const;
|
std::string serializeToString() const;
|
||||||
|
|
||||||
std::string getBlobsCommonPrefix() const
|
const ObjectKeysWithMetadata & getKeysWithMeta() const
|
||||||
{
|
{
|
||||||
return object_storage_root_path;
|
return keys_with_meta;
|
||||||
}
|
|
||||||
|
|
||||||
RelativePathsWithMetadata getBlobsRelativePaths() const
|
|
||||||
{
|
|
||||||
return storage_objects;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isReadOnly() const
|
bool isReadOnly() const
|
||||||
@ -73,12 +69,12 @@ public:
|
|||||||
return read_only;
|
return read_only;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t getRefCount() const
|
UInt32 getRefCount() const
|
||||||
{
|
{
|
||||||
return ref_count;
|
return ref_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getTotalSizeBytes() const
|
UInt64 getTotalSizeBytes() const
|
||||||
{
|
{
|
||||||
return total_size;
|
return total_size;
|
||||||
}
|
}
|
||||||
@ -112,6 +108,8 @@ public:
|
|||||||
{
|
{
|
||||||
return inline_data;
|
return inline_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool getWriteFullObjectKeySetting();
|
||||||
};
|
};
|
||||||
|
|
||||||
using DiskObjectStorageMetadataPtr = std::unique_ptr<DiskObjectStorageMetadata>;
|
using DiskObjectStorageMetadataPtr = std::unique_ptr<DiskObjectStorageMetadata>;
|
||||||
|
@ -34,7 +34,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::createFileOperationObject(
|
|||||||
const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
|
const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
|
||||||
{
|
{
|
||||||
const String relative_path = "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
|
const String relative_path = "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
|
||||||
StoredObject object(fs::path(disk->object_storage_root_path) / relative_path);
|
StoredObject object(fs::path(disk->object_key_prefix) / relative_path);
|
||||||
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, metadata);
|
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, metadata);
|
||||||
buf->write('0');
|
buf->write('0');
|
||||||
buf->finalize();
|
buf->finalize();
|
||||||
@ -52,8 +52,8 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::findLastRevision()
|
|||||||
LOG_TRACE(disk->log, "Check object exists with revision prefix {}", revision_prefix);
|
LOG_TRACE(disk->log, "Check object exists with revision prefix {}", revision_prefix);
|
||||||
|
|
||||||
const auto & object_storage = disk->object_storage;
|
const auto & object_storage = disk->object_storage;
|
||||||
StoredObject revision_object{disk->object_storage_root_path + "r" + revision_prefix};
|
StoredObject revision_object{disk->object_key_prefix + "r" + revision_prefix};
|
||||||
StoredObject revision_operation_object{disk->object_storage_root_path + "operations/r" + revision_prefix};
|
StoredObject revision_operation_object{disk->object_key_prefix + "operations/r" + revision_prefix};
|
||||||
|
|
||||||
/// Check file or operation with such revision prefix exists.
|
/// Check file or operation with such revision prefix exists.
|
||||||
if (object_storage->exists(revision_object) || object_storage->exists(revision_operation_object))
|
if (object_storage->exists(revision_object) || object_storage->exists(revision_operation_object))
|
||||||
@ -80,7 +80,7 @@ int DiskObjectStorageRemoteMetadataRestoreHelper::readSchemaVersion(IObjectStora
|
|||||||
|
|
||||||
void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & version) const
|
void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & version) const
|
||||||
{
|
{
|
||||||
StoredObject object{fs::path(disk->object_storage_root_path) / SCHEMA_VERSION_OBJECT};
|
StoredObject object{fs::path(disk->object_key_prefix) / SCHEMA_VERSION_OBJECT};
|
||||||
|
|
||||||
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
|
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
|
||||||
writeIntText(version, *buf);
|
writeIntText(version, *buf);
|
||||||
@ -187,7 +187,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
RestoreInformation information;
|
RestoreInformation information;
|
||||||
information.source_path = disk->object_storage_root_path;
|
information.source_path = disk->object_key_prefix;
|
||||||
information.source_namespace = disk->object_storage->getObjectsNamespace();
|
information.source_namespace = disk->object_storage->getObjectsNamespace();
|
||||||
|
|
||||||
readRestoreInformation(information);
|
readRestoreInformation(information);
|
||||||
@ -201,11 +201,11 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
|||||||
{
|
{
|
||||||
/// In this case we need to additionally cleanup S3 from objects with later revision.
|
/// In this case we need to additionally cleanup S3 from objects with later revision.
|
||||||
/// Will be simply just restore to different path.
|
/// Will be simply just restore to different path.
|
||||||
if (information.source_path == disk->object_storage_root_path && information.revision != LATEST_REVISION)
|
if (information.source_path == disk->object_key_prefix && information.revision != LATEST_REVISION)
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Restoring to the same bucket and path is allowed if revision is latest (0)");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Restoring to the same bucket and path is allowed if revision is latest (0)");
|
||||||
|
|
||||||
/// This case complicates S3 cleanup in case of unsuccessful restore.
|
/// This case complicates S3 cleanup in case of unsuccessful restore.
|
||||||
if (information.source_path != disk->object_storage_root_path && disk->object_storage_root_path.starts_with(information.source_path))
|
if (information.source_path != disk->object_key_prefix && disk->object_key_prefix.starts_with(information.source_path))
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::BAD_ARGUMENTS,
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
"Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk");
|
"Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk");
|
||||||
@ -224,7 +224,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
|||||||
|
|
||||||
LOG_INFO(disk->log, "Removing old metadata...");
|
LOG_INFO(disk->log, "Removing old metadata...");
|
||||||
|
|
||||||
bool cleanup_s3 = information.source_path != disk->object_storage_root_path;
|
bool cleanup_s3 = information.source_path != disk->object_key_prefix;
|
||||||
for (const auto & root : data_roots)
|
for (const auto & root : data_roots)
|
||||||
if (disk->exists(root))
|
if (disk->exists(root))
|
||||||
disk->removeSharedRecursive(root + '/', !cleanup_s3, {});
|
disk->removeSharedRecursive(root + '/', !cleanup_s3, {});
|
||||||
@ -424,18 +424,17 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
disk->createDirectories(directoryPath(path));
|
disk->createDirectories(directoryPath(path));
|
||||||
auto relative_key = shrinkKey(source_path, key);
|
auto object_key = ObjectStorageKey::createAsRelative(disk->object_key_prefix, shrinkKey(source_path, key));
|
||||||
auto full_path = fs::path(disk->object_storage_root_path) / relative_key;
|
|
||||||
|
|
||||||
StoredObject object_from{key};
|
StoredObject object_from{key};
|
||||||
StoredObject object_to{fs::path(disk->object_storage_root_path) / relative_key};
|
StoredObject object_to{object_key.serialize()};
|
||||||
|
|
||||||
/// Copy object if we restore to different bucket / path.
|
/// Copy object if we restore to different bucket / path.
|
||||||
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_storage_root_path != source_path)
|
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_key_prefix != source_path)
|
||||||
source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, *disk->object_storage);
|
source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, *disk->object_storage);
|
||||||
|
|
||||||
auto tx = disk->metadata_storage->createTransaction();
|
auto tx = disk->metadata_storage->createTransaction();
|
||||||
tx->addBlobToMetadata(path, relative_key, meta.size_bytes);
|
tx->addBlobToMetadata(path, object_key, meta.size_bytes);
|
||||||
tx->commit();
|
tx->commit();
|
||||||
|
|
||||||
LOG_TRACE(disk->log, "Restored file {}", path);
|
LOG_TRACE(disk->log, "Restored file {}", path);
|
||||||
@ -464,7 +463,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
|||||||
{
|
{
|
||||||
/// Enable recording file operations if we restore to different bucket / path.
|
/// Enable recording file operations if we restore to different bucket / path.
|
||||||
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace()
|
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace()
|
||||||
|| disk->object_storage_root_path != restore_information.source_path;
|
|| disk->object_key_prefix != restore_information.source_path;
|
||||||
|
|
||||||
std::set<String> renames;
|
std::set<String> renames;
|
||||||
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const RelativePathsWithMetadata & objects)
|
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const RelativePathsWithMetadata & objects)
|
||||||
|
@ -25,6 +25,7 @@ namespace ErrorCodes
|
|||||||
extern const int BAD_FILE_TYPE;
|
extern const int BAD_FILE_TYPE;
|
||||||
extern const int FILE_ALREADY_EXISTS;
|
extern const int FILE_ALREADY_EXISTS;
|
||||||
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
|
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
|
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
|
||||||
@ -511,12 +512,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
|
|
||||||
for (const auto & object_from : source_blobs)
|
for (const auto & object_from : source_blobs)
|
||||||
{
|
{
|
||||||
std::string blob_name = object_storage.generateBlobNameForPath(to_path);
|
auto object_key = object_storage.generateObjectKeyForPath(to_path);
|
||||||
auto object_to = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
|
auto object_to = StoredObject(object_key.serialize());
|
||||||
|
|
||||||
object_storage.copyObject(object_from, object_to, read_settings, write_settings);
|
object_storage.copyObject(object_from, object_to, read_settings, write_settings);
|
||||||
|
|
||||||
tx->addBlobToMetadata(to_path, blob_name, object_from.bytes_size);
|
tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size);
|
||||||
|
|
||||||
created_objects.push_back(object_to);
|
created_objects.push_back(object_to);
|
||||||
}
|
}
|
||||||
@ -663,46 +664,53 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
|||||||
const WriteSettings & settings,
|
const WriteSettings & settings,
|
||||||
bool autocommit)
|
bool autocommit)
|
||||||
{
|
{
|
||||||
String blob_name;
|
auto object_key = object_storage.generateObjectKeyForPath(path);
|
||||||
std::optional<ObjectAttributes> object_attributes;
|
std::optional<ObjectAttributes> object_attributes;
|
||||||
|
|
||||||
blob_name = object_storage.generateBlobNameForPath(path);
|
|
||||||
if (metadata_helper)
|
if (metadata_helper)
|
||||||
{
|
{
|
||||||
|
if (!object_key.hasPrefix())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "metadata helper is not supported with abs paths");
|
||||||
|
|
||||||
auto revision = metadata_helper->revision_counter + 1;
|
auto revision = metadata_helper->revision_counter + 1;
|
||||||
metadata_helper->revision_counter++;
|
metadata_helper->revision_counter++;
|
||||||
object_attributes = {
|
object_attributes = {
|
||||||
{"path", path}
|
{"path", path}
|
||||||
};
|
};
|
||||||
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
|
|
||||||
|
object_key = ObjectStorageKey::createAsRelative(
|
||||||
|
object_key.getPrefix(),
|
||||||
|
"r" + revisionToString(revision) + "-file-" + object_key.getSuffix());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto object = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
|
/// seems ok
|
||||||
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
|
auto object = StoredObject(object_key.serialize());
|
||||||
std::function<void(size_t count)> create_metadata_callback;
|
std::function<void(size_t count)> create_metadata_callback;
|
||||||
|
|
||||||
if (autocommit)
|
if (autocommit)
|
||||||
{
|
{
|
||||||
create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name](size_t count)
|
create_metadata_callback = [tx = shared_from_this(), mode, path, object_key](size_t count)
|
||||||
{
|
{
|
||||||
if (mode == WriteMode::Rewrite)
|
if (mode == WriteMode::Rewrite)
|
||||||
{
|
{
|
||||||
// Otherwise we will produce lost blobs which nobody points to
|
/// Otherwise we will produce lost blobs which nobody points to
|
||||||
/// WriteOnce storages are not affected by the issue
|
/// WriteOnce storages are not affected by the issue
|
||||||
if (!tx->object_storage.isWriteOnce() && tx->metadata_storage.exists(path))
|
if (!tx->object_storage.isWriteOnce() && tx->metadata_storage.exists(path))
|
||||||
tx->object_storage.removeObjectsIfExist(tx->metadata_storage.getStorageObjects(path));
|
tx->object_storage.removeObjectsIfExist(tx->metadata_storage.getStorageObjects(path));
|
||||||
|
|
||||||
tx->metadata_transaction->createMetadataFile(path, blob_name, count);
|
tx->metadata_transaction->createMetadataFile(path, object_key, count);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);
|
tx->metadata_transaction->addBlobToMetadata(path, object_key, count);
|
||||||
|
|
||||||
tx->metadata_transaction->commit();
|
tx->metadata_transaction->commit();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
create_metadata_callback = [object_storage_tx = shared_from_this(), write_op = write_operation.get(), mode, path, blob_name](size_t count)
|
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
|
||||||
|
|
||||||
|
create_metadata_callback = [object_storage_tx = shared_from_this(), write_op = write_operation.get(), mode, path, object_key](size_t count)
|
||||||
{
|
{
|
||||||
/// This callback called in WriteBuffer finalize method -- only there we actually know
|
/// This callback called in WriteBuffer finalize method -- only there we actually know
|
||||||
/// how many bytes were written. We don't control when this finalize method will be called
|
/// how many bytes were written. We don't control when this finalize method will be called
|
||||||
@ -714,7 +722,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
|||||||
/// ...
|
/// ...
|
||||||
/// buf1->finalize() // shouldn't do anything with metadata operations, just memoize what to do
|
/// buf1->finalize() // shouldn't do anything with metadata operations, just memoize what to do
|
||||||
/// tx->commit()
|
/// tx->commit()
|
||||||
write_op->setOnExecute([object_storage_tx, mode, path, blob_name, count](MetadataTransactionPtr tx)
|
write_op->setOnExecute([object_storage_tx, mode, path, object_key, count](MetadataTransactionPtr tx)
|
||||||
{
|
{
|
||||||
if (mode == WriteMode::Rewrite)
|
if (mode == WriteMode::Rewrite)
|
||||||
{
|
{
|
||||||
@ -726,15 +734,16 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
|||||||
object_storage_tx->metadata_storage.getStorageObjects(path));
|
object_storage_tx->metadata_storage.getStorageObjects(path));
|
||||||
}
|
}
|
||||||
|
|
||||||
tx->createMetadataFile(path, blob_name, count);
|
tx->createMetadataFile(path, object_key, count);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
tx->addBlobToMetadata(path, blob_name, count);
|
tx->addBlobToMetadata(path, object_key, count);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
operations_to_execute.emplace_back(std::move(write_operation));
|
operations_to_execute.emplace_back(std::move(write_operation));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
auto impl = object_storage.writeObject(
|
auto impl = object_storage.writeObject(
|
||||||
object,
|
object,
|
||||||
@ -753,20 +762,27 @@ void DiskObjectStorageTransaction::writeFileUsingBlobWritingFunction(
|
|||||||
const String & path, WriteMode mode, WriteBlobFunction && write_blob_function)
|
const String & path, WriteMode mode, WriteBlobFunction && write_blob_function)
|
||||||
{
|
{
|
||||||
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
|
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
|
||||||
auto blob_name = object_storage.generateBlobNameForPath(path);
|
auto object_key = object_storage.generateObjectKeyForPath(path);
|
||||||
std::optional<ObjectAttributes> object_attributes;
|
std::optional<ObjectAttributes> object_attributes;
|
||||||
|
|
||||||
if (metadata_helper)
|
if (metadata_helper)
|
||||||
{
|
{
|
||||||
|
if (!object_key.hasPrefix())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "metadata helper is not supported with abs paths");
|
||||||
|
|
||||||
auto revision = metadata_helper->revision_counter + 1;
|
auto revision = metadata_helper->revision_counter + 1;
|
||||||
metadata_helper->revision_counter++;
|
metadata_helper->revision_counter++;
|
||||||
object_attributes = {
|
object_attributes = {
|
||||||
{"path", path}
|
{"path", path}
|
||||||
};
|
};
|
||||||
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
|
|
||||||
|
object_key = ObjectStorageKey::createAsRelative(
|
||||||
|
object_key.getPrefix(),
|
||||||
|
"r" + revisionToString(revision) + "-file-" + object_key.getSuffix());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto object = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
|
/// seems ok
|
||||||
|
auto object = StoredObject(object_key.serialize());
|
||||||
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
|
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
|
||||||
|
|
||||||
operations_to_execute.emplace_back(std::move(write_operation));
|
operations_to_execute.emplace_back(std::move(write_operation));
|
||||||
@ -788,10 +804,10 @@ void DiskObjectStorageTransaction::writeFileUsingBlobWritingFunction(
|
|||||||
if (!object_storage.isWriteOnce() && metadata_storage.exists(path))
|
if (!object_storage.isWriteOnce() && metadata_storage.exists(path))
|
||||||
object_storage.removeObjectsIfExist(metadata_storage.getStorageObjects(path));
|
object_storage.removeObjectsIfExist(metadata_storage.getStorageObjects(path));
|
||||||
|
|
||||||
metadata_transaction->createMetadataFile(path, blob_name, object_size);
|
metadata_transaction->createMetadataFile(path, std::move(object_key), object_size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);
|
metadata_transaction->addBlobToMetadata(path, std::move(object_key), object_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -28,9 +28,10 @@ void HDFSObjectStorage::startup()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string HDFSObjectStorage::generateBlobNameForPath(const std::string & /* path */)
|
ObjectStorageKey HDFSObjectStorage::generateObjectKeyForPath(const std::string & /* path */) const
|
||||||
{
|
{
|
||||||
return getRandomASCIIString(32);
|
/// what ever data_source_description.description value is, consider that key as relative key
|
||||||
|
return ObjectStorageKey::createAsRelative(data_source_description.description, getRandomASCIIString(32));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HDFSObjectStorage::exists(const StoredObject & object) const
|
bool HDFSObjectStorage::exists(const StoredObject & object) const
|
||||||
|
@ -114,7 +114,7 @@ public:
|
|||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
ContextPtr context) override;
|
ContextPtr context) override;
|
||||||
|
|
||||||
std::string generateBlobNameForPath(const std::string & path) override;
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||||
|
|
||||||
bool isRemote() const override { return true; }
|
bool isRemote() const override { return true; }
|
||||||
|
|
||||||
|
@ -126,10 +126,10 @@ public:
|
|||||||
virtual void createEmptyMetadataFile(const std::string & path) = 0;
|
virtual void createEmptyMetadataFile(const std::string & path) = 0;
|
||||||
|
|
||||||
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
||||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
virtual void createMetadataFile(const std::string & path, ObjectStorageKey key, uint64_t size_in_bytes) = 0;
|
||||||
|
|
||||||
/// Add to new blob to metadata file (way to implement appends)
|
/// Add to new blob to metadata file (way to implement appends)
|
||||||
virtual void addBlobToMetadata(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
virtual void addBlobToMetadata(const std::string & /* path */, ObjectStorageKey /* key */, uint64_t /* size_in_bytes */)
|
||||||
{
|
{
|
||||||
throwNotImplemented();
|
throwNotImplemented();
|
||||||
}
|
}
|
||||||
@ -221,8 +221,6 @@ public:
|
|||||||
/// object_storage_path is absolute.
|
/// object_storage_path is absolute.
|
||||||
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
|
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
|
||||||
|
|
||||||
virtual std::string getObjectStorageRootPath() const = 0;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
[[noreturn]] static void throwNotImplemented()
|
[[noreturn]] static void throwNotImplemented()
|
||||||
{
|
{
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||||
#include <Common/getRandomASCIIString.h>
|
#include <Common/Exception.h>
|
||||||
#include <IO/WriteBufferFromFileBase.h>
|
#include <IO/WriteBufferFromFileBase.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
#include <IO/ReadBufferFromFileBase.h>
|
#include <IO/ReadBufferFromFileBase.h>
|
||||||
@ -95,21 +95,4 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
|
|||||||
return settings;
|
return settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string IObjectStorage::generateBlobNameForPath(const std::string & /* path */)
|
|
||||||
{
|
|
||||||
/// Path to store the new S3 object.
|
|
||||||
|
|
||||||
/// Total length is 32 a-z characters for enough randomness.
|
|
||||||
/// First 3 characters are used as a prefix for
|
|
||||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/
|
|
||||||
|
|
||||||
constexpr size_t key_name_total_size = 32;
|
|
||||||
constexpr size_t key_name_prefix_size = 3;
|
|
||||||
|
|
||||||
/// Path to store new S3 object.
|
|
||||||
return fmt::format("{}/{}",
|
|
||||||
getRandomASCIIString(key_name_prefix_size),
|
|
||||||
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,6 @@
|
|||||||
#include <Poco/Timestamp.h>
|
#include <Poco/Timestamp.h>
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <Core/Defines.h>
|
#include <Core/Defines.h>
|
||||||
#include <Common/Exception.h>
|
|
||||||
#include <IO/ReadSettings.h>
|
#include <IO/ReadSettings.h>
|
||||||
#include <IO/WriteSettings.h>
|
#include <IO/WriteSettings.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
@ -17,6 +16,7 @@
|
|||||||
#include <Disks/ObjectStorages/StoredObject.h>
|
#include <Disks/ObjectStorages/StoredObject.h>
|
||||||
#include <Disks/DiskType.h>
|
#include <Disks/DiskType.h>
|
||||||
#include <Common/ThreadPool_fwd.h>
|
#include <Common/ThreadPool_fwd.h>
|
||||||
|
#include <Common/ObjectStorageKey.h>
|
||||||
#include <Disks/WriteMode.h>
|
#include <Disks/WriteMode.h>
|
||||||
#include <Interpreters/Context_fwd.h>
|
#include <Interpreters/Context_fwd.h>
|
||||||
#include <Core/Types.h>
|
#include <Core/Types.h>
|
||||||
@ -35,7 +35,7 @@ using ObjectAttributes = std::map<std::string, std::string>;
|
|||||||
|
|
||||||
struct ObjectMetadata
|
struct ObjectMetadata
|
||||||
{
|
{
|
||||||
uint64_t size_bytes;
|
uint64_t size_bytes = 0;
|
||||||
std::optional<Poco::Timestamp> last_modified;
|
std::optional<Poco::Timestamp> last_modified;
|
||||||
std::optional<ObjectAttributes> attributes;
|
std::optional<ObjectAttributes> attributes;
|
||||||
};
|
};
|
||||||
@ -43,16 +43,31 @@ struct ObjectMetadata
|
|||||||
struct RelativePathWithMetadata
|
struct RelativePathWithMetadata
|
||||||
{
|
{
|
||||||
String relative_path;
|
String relative_path;
|
||||||
ObjectMetadata metadata{};
|
ObjectMetadata metadata;
|
||||||
|
|
||||||
RelativePathWithMetadata() = default;
|
RelativePathWithMetadata() = default;
|
||||||
|
|
||||||
RelativePathWithMetadata(const String & relative_path_, const ObjectMetadata & metadata_)
|
RelativePathWithMetadata(String relative_path_, ObjectMetadata metadata_)
|
||||||
: relative_path(relative_path_), metadata(metadata_)
|
: relative_path(std::move(relative_path_))
|
||||||
|
, metadata(std::move(metadata_))
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ObjectKeyWithMetadata
|
||||||
|
{
|
||||||
|
ObjectStorageKey key;
|
||||||
|
ObjectMetadata metadata;
|
||||||
|
|
||||||
|
ObjectKeyWithMetadata() = default;
|
||||||
|
|
||||||
|
ObjectKeyWithMetadata(ObjectStorageKey key_, ObjectMetadata metadata_)
|
||||||
|
: key(std::move(key_))
|
||||||
|
, metadata(std::move(metadata_))
|
||||||
{}
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
using RelativePathsWithMetadata = std::vector<RelativePathWithMetadata>;
|
using RelativePathsWithMetadata = std::vector<RelativePathWithMetadata>;
|
||||||
|
using ObjectKeysWithMetadata = std::vector<ObjectKeyWithMetadata>;
|
||||||
|
|
||||||
class IObjectStorageIterator;
|
class IObjectStorageIterator;
|
||||||
using ObjectStorageIteratorPtr = std::shared_ptr<IObjectStorageIterator>;
|
using ObjectStorageIteratorPtr = std::shared_ptr<IObjectStorageIterator>;
|
||||||
@ -176,7 +191,7 @@ public:
|
|||||||
|
|
||||||
/// Generate blob name for passed absolute local path.
|
/// Generate blob name for passed absolute local path.
|
||||||
/// Path can be generated either independently or based on `path`.
|
/// Path can be generated either independently or based on `path`.
|
||||||
virtual std::string generateBlobNameForPath(const std::string & path);
|
virtual ObjectStorageKey generateObjectKeyForPath(const std::string & path) const = 0;
|
||||||
|
|
||||||
/// Get unique id for passed absolute path in object storage.
|
/// Get unique id for passed absolute path in object storage.
|
||||||
virtual std::string getUniqueId(const std::string & path) const { return path; }
|
virtual std::string getUniqueId(const std::string & path) const { return path; }
|
||||||
|
@ -24,8 +24,9 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_UNLINK;
|
extern const int CANNOT_UNLINK;
|
||||||
}
|
}
|
||||||
|
|
||||||
LocalObjectStorage::LocalObjectStorage()
|
LocalObjectStorage::LocalObjectStorage(String key_prefix_)
|
||||||
: log(&Poco::Logger::get("LocalObjectStorage"))
|
: key_prefix(std::move(key_prefix_))
|
||||||
|
, log(&Poco::Logger::get("LocalObjectStorage"))
|
||||||
{
|
{
|
||||||
data_source_description.type = DataSourceType::Local;
|
data_source_description.type = DataSourceType::Local;
|
||||||
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
|
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
|
||||||
@ -200,10 +201,10 @@ void LocalObjectStorage::applyNewSettings(
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string LocalObjectStorage::generateBlobNameForPath(const std::string & /* path */)
|
ObjectStorageKey LocalObjectStorage::generateObjectKeyForPath(const std::string & /* path */) const
|
||||||
{
|
{
|
||||||
constexpr size_t key_name_total_size = 32;
|
constexpr size_t key_name_total_size = 32;
|
||||||
return getRandomASCIIString(key_name_total_size);
|
return ObjectStorageKey::createAsRelative(key_prefix, getRandomASCIIString(key_name_total_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ namespace DB
|
|||||||
class LocalObjectStorage : public IObjectStorage
|
class LocalObjectStorage : public IObjectStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
LocalObjectStorage();
|
LocalObjectStorage(String key_prefix_);
|
||||||
|
|
||||||
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
|
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
|
||||||
|
|
||||||
@ -78,13 +78,14 @@ public:
|
|||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
ContextPtr context) override;
|
ContextPtr context) override;
|
||||||
|
|
||||||
std::string generateBlobNameForPath(const std::string & path) override;
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||||
|
|
||||||
bool isRemote() const override { return false; }
|
bool isRemote() const override { return false; }
|
||||||
|
|
||||||
ReadSettings patchSettings(const ReadSettings & read_settings) const override;
|
ReadSettings patchSettings(const ReadSettings & read_settings) const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
String key_prefix;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
DataSourceDescription data_source_description;
|
DataSourceDescription data_source_description;
|
||||||
};
|
};
|
||||||
|
@ -20,23 +20,25 @@ void registerDiskLocalObjectStorage(DiskFactory & factory, bool global_skip_acce
|
|||||||
ContextPtr context,
|
ContextPtr context,
|
||||||
const DisksMap & /*map*/) -> DiskPtr
|
const DisksMap & /*map*/) -> DiskPtr
|
||||||
{
|
{
|
||||||
String path;
|
String object_key_prefix;
|
||||||
UInt64 keep_free_space_bytes;
|
UInt64 keep_free_space_bytes;
|
||||||
loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes);
|
loadDiskLocalConfig(name, config, config_prefix, context, object_key_prefix, keep_free_space_bytes);
|
||||||
fs::create_directories(path);
|
/// keys are mapped to the fs, object_key_prefix is a directory also
|
||||||
|
fs::create_directories(object_key_prefix);
|
||||||
|
|
||||||
String type = config.getString(config_prefix + ".type");
|
String type = config.getString(config_prefix + ".type");
|
||||||
chassert(type == "local_blob_storage");
|
chassert(type == "local_blob_storage");
|
||||||
|
|
||||||
std::shared_ptr<LocalObjectStorage> local_storage = std::make_shared<LocalObjectStorage>();
|
std::shared_ptr<LocalObjectStorage> local_storage = std::make_shared<LocalObjectStorage>(object_key_prefix);
|
||||||
MetadataStoragePtr metadata_storage;
|
MetadataStoragePtr metadata_storage;
|
||||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||||
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, path);
|
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, object_key_prefix);
|
||||||
|
|
||||||
auto disk = std::make_shared<DiskObjectStorage>(
|
auto disk = std::make_shared<DiskObjectStorage>(
|
||||||
name, path, "Local", metadata_storage, local_storage, config, config_prefix);
|
name, object_key_prefix, "Local", metadata_storage, local_storage, config, config_prefix);
|
||||||
disk->startup(context, global_skip_access_check);
|
disk->startup(context, global_skip_access_check);
|
||||||
return disk;
|
return disk;
|
||||||
|
|
||||||
};
|
};
|
||||||
factory.registerDiskType("local_blob_storage", creator);
|
factory.registerDiskType("local_blob_storage", creator);
|
||||||
}
|
}
|
||||||
|
@ -15,9 +15,9 @@ namespace ErrorCodes
|
|||||||
extern const int FS_METADATA_ERROR;
|
extern const int FS_METADATA_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
MetadataStorageFromDisk::MetadataStorageFromDisk(DiskPtr disk_, const std::string & object_storage_root_path_)
|
MetadataStorageFromDisk::MetadataStorageFromDisk(DiskPtr disk_, String compatible_key_prefix_)
|
||||||
: disk(disk_)
|
: disk(disk_)
|
||||||
, object_storage_root_path(object_storage_root_path_)
|
, compatible_key_prefix(compatible_key_prefix_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,7 +85,7 @@ std::string MetadataStorageFromDisk::readInlineDataToString(const std::string &
|
|||||||
|
|
||||||
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> &) const
|
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> &) const
|
||||||
{
|
{
|
||||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
|
auto metadata = std::make_unique<DiskObjectStorageMetadata>(compatible_key_prefix, path);
|
||||||
auto str = readFileToString(path);
|
auto str = readFileToString(path);
|
||||||
metadata->deserializeFromString(str);
|
metadata->deserializeFromString(str);
|
||||||
return metadata;
|
return metadata;
|
||||||
@ -93,7 +93,7 @@ DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const
|
|||||||
|
|
||||||
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> &) const
|
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> &) const
|
||||||
{
|
{
|
||||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
|
auto metadata = std::make_unique<DiskObjectStorageMetadata>(compatible_key_prefix, path);
|
||||||
auto str = readFileToString(path);
|
auto str = readFileToString(path);
|
||||||
metadata->deserializeFromString(str);
|
metadata->deserializeFromString(str);
|
||||||
return metadata;
|
return metadata;
|
||||||
@ -135,21 +135,16 @@ MetadataTransactionPtr MetadataStorageFromDisk::createTransaction()
|
|||||||
StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & path) const
|
StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & path) const
|
||||||
{
|
{
|
||||||
auto metadata = readMetadata(path);
|
auto metadata = readMetadata(path);
|
||||||
|
const auto & keys_with_meta = metadata->getKeysWithMeta();
|
||||||
|
|
||||||
auto object_storage_relative_paths = metadata->getBlobsRelativePaths(); /// Relative paths.
|
StoredObjects objects;
|
||||||
|
objects.reserve(keys_with_meta.size());
|
||||||
StoredObjects object_storage_paths;
|
for (const auto & [object_key, object_meta] : keys_with_meta)
|
||||||
object_storage_paths.reserve(object_storage_relative_paths.size());
|
|
||||||
|
|
||||||
/// Relative paths -> absolute.
|
|
||||||
for (auto & [object_relative_path, object_meta] : object_storage_relative_paths)
|
|
||||||
{
|
{
|
||||||
auto object_path = fs::path(metadata->getBlobsCommonPrefix()) / object_relative_path;
|
objects.emplace_back(object_key.serialize(), object_meta.size_bytes, path);
|
||||||
StoredObject object{ object_path, object_meta.size_bytes, path };
|
|
||||||
object_storage_paths.push_back(object);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return object_storage_paths;
|
return objects;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
|
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
|
||||||
@ -253,8 +248,7 @@ void MetadataStorageFromDiskTransaction::writeInlineDataToFile(
|
|||||||
const std::string & path,
|
const std::string & path,
|
||||||
const std::string & data)
|
const std::string & data)
|
||||||
{
|
{
|
||||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(
|
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path);
|
||||||
metadata_storage.getDisk()->getPath(), metadata_storage.getObjectStorageRootPath(), path);
|
|
||||||
metadata->setInlineData(data);
|
metadata->setInlineData(data);
|
||||||
writeStringToFile(path, metadata->serializeToString());
|
writeStringToFile(path, metadata->serializeToString());
|
||||||
}
|
}
|
||||||
@ -318,26 +312,23 @@ void MetadataStorageFromDiskTransaction::setReadOnly(const std::string & path)
|
|||||||
|
|
||||||
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
|
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
|
||||||
{
|
{
|
||||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(
|
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path);
|
||||||
metadata_storage.getDisk()->getPath(), metadata_storage.getObjectStorageRootPath(), path);
|
|
||||||
writeStringToFile(path, metadata->serializeToString());
|
writeStringToFile(path, metadata->serializeToString());
|
||||||
}
|
}
|
||||||
|
|
||||||
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes)
|
||||||
{
|
{
|
||||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(
|
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path);
|
||||||
metadata_storage.getDisk()->getPath(), metadata_storage.getObjectStorageRootPath(), path);
|
metadata->addObject(std::move(object_key), size_in_bytes);
|
||||||
|
|
||||||
metadata->addObject(blob_name, size_in_bytes);
|
|
||||||
|
|
||||||
auto data = metadata->serializeToString();
|
auto data = metadata->serializeToString();
|
||||||
if (!data.empty())
|
if (!data.empty())
|
||||||
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.getDisk(), data));
|
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.getDisk(), data));
|
||||||
}
|
}
|
||||||
|
|
||||||
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes)
|
||||||
{
|
{
|
||||||
addOperation(std::make_unique<AddBlobOperation>(path, blob_name, metadata_storage.object_storage_root_path, size_in_bytes, *metadata_storage.disk, metadata_storage));
|
addOperation(std::make_unique<AddBlobOperation>(path, std::move(object_key), size_in_bytes, *metadata_storage.disk, metadata_storage));
|
||||||
}
|
}
|
||||||
|
|
||||||
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
||||||
|
@ -22,12 +22,11 @@ private:
|
|||||||
friend class MetadataStorageFromDiskTransaction;
|
friend class MetadataStorageFromDiskTransaction;
|
||||||
|
|
||||||
mutable SharedMutex metadata_mutex;
|
mutable SharedMutex metadata_mutex;
|
||||||
|
|
||||||
DiskPtr disk;
|
DiskPtr disk;
|
||||||
std::string object_storage_root_path;
|
String compatible_key_prefix;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MetadataStorageFromDisk(DiskPtr disk_, const std::string & object_storage_root_path_);
|
MetadataStorageFromDisk(DiskPtr disk_, String compatible_key_prefix);
|
||||||
|
|
||||||
MetadataTransactionPtr createTransaction() override;
|
MetadataTransactionPtr createTransaction() override;
|
||||||
|
|
||||||
@ -67,8 +66,6 @@ public:
|
|||||||
|
|
||||||
StoredObjects getStorageObjects(const std::string & path) const override;
|
StoredObjects getStorageObjects(const std::string & path) const override;
|
||||||
|
|
||||||
std::string getObjectStorageRootPath() const override { return object_storage_root_path; }
|
|
||||||
|
|
||||||
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
|
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
|
||||||
|
|
||||||
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> & lock) const;
|
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> & lock) const;
|
||||||
@ -104,9 +101,9 @@ public:
|
|||||||
|
|
||||||
void createEmptyMetadataFile(const std::string & path) override;
|
void createEmptyMetadataFile(const std::string & path) override;
|
||||||
|
|
||||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
void createMetadataFile(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes) override;
|
||||||
|
|
||||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
void addBlobToMetadata(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes) override;
|
||||||
|
|
||||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
||||||
|
|
||||||
|
@ -294,9 +294,9 @@ void AddBlobOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
|
|||||||
if (metadata_storage.exists(path))
|
if (metadata_storage.exists(path))
|
||||||
metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
|
metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
|
||||||
else
|
else
|
||||||
metadata = std::make_unique<DiskObjectStorageMetadata>(disk.getPath(), root_path, path);
|
metadata = std::make_unique<DiskObjectStorageMetadata>(disk.getPath(), path);
|
||||||
|
|
||||||
metadata->addObject(blob_name, size_in_bytes);
|
metadata->addObject(object_key, size_in_bytes);
|
||||||
|
|
||||||
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
|
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
|
||||||
|
|
||||||
|
@ -216,14 +216,12 @@ struct AddBlobOperation final : public IMetadataOperation
|
|||||||
{
|
{
|
||||||
AddBlobOperation(
|
AddBlobOperation(
|
||||||
const std::string & path_,
|
const std::string & path_,
|
||||||
const std::string & blob_name_,
|
ObjectStorageKey object_key_,
|
||||||
const std::string & root_path_,
|
|
||||||
uint64_t size_in_bytes_,
|
uint64_t size_in_bytes_,
|
||||||
IDisk & disk_,
|
IDisk & disk_,
|
||||||
const MetadataStorageFromDisk & metadata_storage_)
|
const MetadataStorageFromDisk & metadata_storage_)
|
||||||
: path(path_)
|
: path(path_)
|
||||||
, blob_name(blob_name_)
|
, object_key(std::move(object_key_))
|
||||||
, root_path(root_path_)
|
|
||||||
, size_in_bytes(size_in_bytes_)
|
, size_in_bytes(size_in_bytes_)
|
||||||
, disk(disk_)
|
, disk(disk_)
|
||||||
, metadata_storage(metadata_storage_)
|
, metadata_storage(metadata_storage_)
|
||||||
@ -235,8 +233,7 @@ struct AddBlobOperation final : public IMetadataOperation
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
std::string path;
|
std::string path;
|
||||||
std::string blob_name;
|
ObjectStorageKey object_key;
|
||||||
std::string root_path;
|
|
||||||
uint64_t size_in_bytes;
|
uint64_t size_in_bytes;
|
||||||
IDisk & disk;
|
IDisk & disk;
|
||||||
const MetadataStorageFromDisk & metadata_storage;
|
const MetadataStorageFromDisk & metadata_storage;
|
||||||
|
@ -12,9 +12,9 @@ namespace DB
|
|||||||
|
|
||||||
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
|
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
|
||||||
ObjectStoragePtr object_storage_,
|
ObjectStoragePtr object_storage_,
|
||||||
const std::string & object_storage_root_path_)
|
String storage_path_prefix_)
|
||||||
: object_storage(object_storage_)
|
: object_storage(object_storage_)
|
||||||
, object_storage_root_path(object_storage_root_path_)
|
, storage_path_prefix(std::move(storage_path_prefix_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,19 +25,15 @@ MetadataTransactionPtr MetadataStorageFromPlainObjectStorage::createTransaction(
|
|||||||
|
|
||||||
const std::string & MetadataStorageFromPlainObjectStorage::getPath() const
|
const std::string & MetadataStorageFromPlainObjectStorage::getPath() const
|
||||||
{
|
{
|
||||||
return object_storage_root_path;
|
return storage_path_prefix;
|
||||||
}
|
|
||||||
std::filesystem::path MetadataStorageFromPlainObjectStorage::getAbsolutePath(const std::string & path) const
|
|
||||||
{
|
|
||||||
return fs::path(object_storage_root_path) / path;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MetadataStorageFromPlainObjectStorage::exists(const std::string & path) const
|
bool MetadataStorageFromPlainObjectStorage::exists(const std::string & path) const
|
||||||
{
|
{
|
||||||
/// NOTE: exists() cannot be used here since it works only for existing
|
/// NOTE: exists() cannot be used here since it works only for existing
|
||||||
/// key, and does not work for some intermediate path.
|
/// key, and does not work for some intermediate path.
|
||||||
std::string abs_path = getAbsolutePath(path);
|
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||||
return object_storage->existsOrHasAnyChild(abs_path);
|
return object_storage->existsOrHasAnyChild(object_key.serialize());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) const
|
bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) const
|
||||||
@ -48,7 +44,8 @@ bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) con
|
|||||||
|
|
||||||
bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path) const
|
bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path) const
|
||||||
{
|
{
|
||||||
std::string directory = getAbsolutePath(path);
|
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||||
|
std::string directory = object_key.serialize();
|
||||||
if (!directory.ends_with('/'))
|
if (!directory.ends_with('/'))
|
||||||
directory += '/';
|
directory += '/';
|
||||||
|
|
||||||
@ -59,8 +56,8 @@ bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path
|
|||||||
|
|
||||||
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const
|
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const
|
||||||
{
|
{
|
||||||
RelativePathsWithMetadata children;
|
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||||
auto metadata = object_storage->tryGetObjectMetadata(getAbsolutePath(path));
|
auto metadata = object_storage->tryGetObjectMetadata(object_key.serialize());
|
||||||
if (metadata)
|
if (metadata)
|
||||||
return metadata->size_bytes;
|
return metadata->size_bytes;
|
||||||
return 0;
|
return 0;
|
||||||
@ -68,12 +65,14 @@ uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path)
|
|||||||
|
|
||||||
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
|
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
|
||||||
{
|
{
|
||||||
RelativePathsWithMetadata files;
|
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||||
std::string abs_path = getAbsolutePath(path);
|
|
||||||
if (!abs_path.ends_with('/'))
|
|
||||||
abs_path += '/';
|
|
||||||
|
|
||||||
object_storage->listObjects(abs_path, files, 0);
|
RelativePathsWithMetadata files;
|
||||||
|
std::string abs_key = object_key.serialize();
|
||||||
|
if (!abs_key.ends_with('/'))
|
||||||
|
abs_key += '/';
|
||||||
|
|
||||||
|
object_storage->listObjects(abs_key, files, 0);
|
||||||
|
|
||||||
std::vector<std::string> result;
|
std::vector<std::string> result;
|
||||||
for (const auto & path_size : files)
|
for (const auto & path_size : files)
|
||||||
@ -84,8 +83,8 @@ std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(co
|
|||||||
std::unordered_set<std::string> duplicates_filter;
|
std::unordered_set<std::string> duplicates_filter;
|
||||||
for (auto & row : result)
|
for (auto & row : result)
|
||||||
{
|
{
|
||||||
chassert(row.starts_with(abs_path));
|
chassert(row.starts_with(abs_key));
|
||||||
row.erase(0, abs_path.size());
|
row.erase(0, abs_key.size());
|
||||||
auto slash_pos = row.find_first_of('/');
|
auto slash_pos = row.find_first_of('/');
|
||||||
if (slash_pos != std::string::npos)
|
if (slash_pos != std::string::npos)
|
||||||
row.erase(slash_pos, row.size() - slash_pos);
|
row.erase(slash_pos, row.size() - slash_pos);
|
||||||
@ -105,10 +104,9 @@ DirectoryIteratorPtr MetadataStorageFromPlainObjectStorage::iterateDirectory(con
|
|||||||
|
|
||||||
StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std::string & path) const
|
StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std::string & path) const
|
||||||
{
|
{
|
||||||
std::string blob_name = object_storage->generateBlobNameForPath(path);
|
size_t object_size = getFileSize(path);
|
||||||
size_t object_size = getFileSize(blob_name);
|
auto object_key = object_storage->generateObjectKeyForPath(path);
|
||||||
auto object = StoredObject(getAbsolutePath(blob_name), object_size, path);
|
return {StoredObject(object_key.serialize(), object_size, path)};
|
||||||
return {std::move(object)};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
|
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
|
||||||
@ -118,7 +116,8 @@ const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getSt
|
|||||||
|
|
||||||
void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::string & path)
|
void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::string & path)
|
||||||
{
|
{
|
||||||
auto object = StoredObject(metadata_storage.getAbsolutePath(path));
|
auto object_key = metadata_storage.object_storage->generateObjectKeyForPath(path);
|
||||||
|
auto object = StoredObject(object_key.serialize());
|
||||||
metadata_storage.object_storage->removeObject(object);
|
metadata_storage.object_storage->removeObject(object);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +130,7 @@ void MetadataStorageFromPlainObjectStorageTransaction::createDirectoryRecursive(
|
|||||||
/// Noop. It is an Object Storage not a filesystem.
|
/// Noop. It is an Object Storage not a filesystem.
|
||||||
}
|
}
|
||||||
void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
|
void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
|
||||||
const std::string &, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
const std::string &, ObjectStorageKey /* object_key */, uint64_t /* size_in_bytes */)
|
||||||
{
|
{
|
||||||
/// Noop, local metadata files is only one file, it is the metadata file itself.
|
/// Noop, local metadata files is only one file, it is the metadata file itself.
|
||||||
}
|
}
|
||||||
|
@ -29,12 +29,10 @@ private:
|
|||||||
friend class MetadataStorageFromPlainObjectStorageTransaction;
|
friend class MetadataStorageFromPlainObjectStorageTransaction;
|
||||||
|
|
||||||
ObjectStoragePtr object_storage;
|
ObjectStoragePtr object_storage;
|
||||||
std::string object_storage_root_path;
|
String storage_path_prefix;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MetadataStorageFromPlainObjectStorage(
|
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_);
|
||||||
ObjectStoragePtr object_storage_,
|
|
||||||
const std::string & object_storage_root_path_);
|
|
||||||
|
|
||||||
MetadataTransactionPtr createTransaction() override;
|
MetadataTransactionPtr createTransaction() override;
|
||||||
|
|
||||||
@ -56,8 +54,6 @@ public:
|
|||||||
|
|
||||||
StoredObjects getStorageObjects(const std::string & path) const override;
|
StoredObjects getStorageObjects(const std::string & path) const override;
|
||||||
|
|
||||||
std::string getObjectStorageRootPath() const override { return object_storage_root_path; }
|
|
||||||
|
|
||||||
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
||||||
{
|
{
|
||||||
/// Required by MergeTree
|
/// Required by MergeTree
|
||||||
@ -71,9 +67,6 @@ public:
|
|||||||
|
|
||||||
bool supportsChmod() const override { return false; }
|
bool supportsChmod() const override { return false; }
|
||||||
bool supportsStat() const override { return false; }
|
bool supportsStat() const override { return false; }
|
||||||
|
|
||||||
private:
|
|
||||||
std::filesystem::path getAbsolutePath(const std::string & path) const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction
|
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction
|
||||||
@ -89,14 +82,14 @@ public:
|
|||||||
|
|
||||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
|
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
|
||||||
|
|
||||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
void addBlobToMetadata(const std::string & path, ObjectStorageKey object_key, uint64_t size_in_bytes) override;
|
||||||
|
|
||||||
void createEmptyMetadataFile(const std::string & /* path */) override
|
void createEmptyMetadataFile(const std::string & /* path */) override
|
||||||
{
|
{
|
||||||
/// No metadata, no need to create anything.
|
/// No metadata, no need to create anything.
|
||||||
}
|
}
|
||||||
|
|
||||||
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
|
void createMetadataFile(const std::string & /* path */, ObjectStorageKey /* object_key */, uint64_t /* size_in_bytes */) override
|
||||||
{
|
{
|
||||||
/// Noop
|
/// Noop
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||||
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
||||||
|
|
||||||
|
#include <Common/getRandomASCIIString.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <Common/StringUtils/StringUtils.h>
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
@ -127,7 +128,10 @@ private:
|
|||||||
result = !objects.empty();
|
result = !objects.empty();
|
||||||
|
|
||||||
for (const auto & object : objects)
|
for (const auto & object : objects)
|
||||||
batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}});
|
batch.emplace_back(
|
||||||
|
object.GetKey(),
|
||||||
|
ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}}
|
||||||
|
);
|
||||||
|
|
||||||
if (result)
|
if (result)
|
||||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||||
@ -293,7 +297,12 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
for (const auto & object : objects)
|
for (const auto & object : objects)
|
||||||
children.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}});
|
children.emplace_back(
|
||||||
|
object.GetKey(),
|
||||||
|
ObjectMetadata{
|
||||||
|
static_cast<uint64_t>(object.GetSize()),
|
||||||
|
Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()),
|
||||||
|
{}});
|
||||||
|
|
||||||
if (max_keys)
|
if (max_keys)
|
||||||
{
|
{
|
||||||
@ -524,12 +533,33 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
|||||||
return std::make_unique<S3ObjectStorage>(
|
return std::make_unique<S3ObjectStorage>(
|
||||||
std::move(new_client), std::move(new_s3_settings),
|
std::move(new_client), std::move(new_s3_settings),
|
||||||
version_id, s3_capabilities, new_namespace,
|
version_id, s3_capabilities, new_namespace,
|
||||||
endpoint);
|
endpoint, object_key_prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
|
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
|
||||||
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
|
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
|
||||||
|
|
||||||
|
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const
|
||||||
|
{
|
||||||
|
/// Path to store the new S3 object.
|
||||||
|
|
||||||
|
/// Total length is 32 a-z characters for enough randomness.
|
||||||
|
/// First 3 characters are used as a prefix for
|
||||||
|
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/
|
||||||
|
|
||||||
|
constexpr size_t key_name_total_size = 32;
|
||||||
|
constexpr size_t key_name_prefix_size = 3;
|
||||||
|
|
||||||
|
/// Path to store new S3 object.
|
||||||
|
String key = fmt::format("{}/{}",
|
||||||
|
getRandomASCIIString(key_name_prefix_size),
|
||||||
|
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
|
||||||
|
|
||||||
|
/// what ever key_prefix value is, consider that key as relative
|
||||||
|
return ObjectStorageKey::createAsRelative(object_key_prefix, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -59,8 +59,10 @@ private:
|
|||||||
String version_id_,
|
String version_id_,
|
||||||
const S3Capabilities & s3_capabilities_,
|
const S3Capabilities & s3_capabilities_,
|
||||||
String bucket_,
|
String bucket_,
|
||||||
String connection_string)
|
String connection_string,
|
||||||
: bucket(bucket_)
|
String object_key_prefix_)
|
||||||
|
: bucket(std::move(bucket_))
|
||||||
|
, object_key_prefix(std::move(object_key_prefix_))
|
||||||
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
|
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
|
||||||
, s3_settings(std::move(s3_settings_))
|
, s3_settings(std::move(s3_settings_))
|
||||||
, s3_capabilities(s3_capabilities_)
|
, s3_capabilities(s3_capabilities_)
|
||||||
@ -170,13 +172,17 @@ public:
|
|||||||
|
|
||||||
bool supportParallelWrite() const override { return true; }
|
bool supportParallelWrite() const override { return true; }
|
||||||
|
|
||||||
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||||
|
|
||||||
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
||||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||||
|
|
||||||
|
private:
|
||||||
std::string bucket;
|
std::string bucket;
|
||||||
|
String object_key_prefix;
|
||||||
|
|
||||||
MultiVersion<Clients> clients;
|
MultiVersion<Clients> clients;
|
||||||
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
||||||
@ -195,7 +201,11 @@ private:
|
|||||||
class S3PlainObjectStorage : public S3ObjectStorage
|
class S3PlainObjectStorage : public S3ObjectStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
std::string generateBlobNameForPath(const std::string & path) override { return path; }
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override
|
||||||
|
{
|
||||||
|
return ObjectStorageKey::createAsRelative(object_key_prefix, path);
|
||||||
|
}
|
||||||
|
|
||||||
std::string getName() const override { return "S3PlainObjectStorage"; }
|
std::string getName() const override { return "S3PlainObjectStorage"; }
|
||||||
|
|
||||||
template <class ...Args>
|
template <class ...Args>
|
||||||
|
@ -126,12 +126,15 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
|
|||||||
if (config.getBool(config_prefix + ".send_metadata", false))
|
if (config.getBool(config_prefix + ".send_metadata", false))
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
|
||||||
|
|
||||||
s3_storage = std::make_shared<S3PlainObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
s3_storage = std::make_shared<S3PlainObjectStorage>(
|
||||||
|
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
|
||||||
|
|
||||||
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
|
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
s3_storage = std::make_shared<S3ObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
s3_storage = std::make_shared<S3ObjectStorage>(
|
||||||
|
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
|
||||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||||
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
|
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <base/types.h>
|
||||||
|
|
||||||
|
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -11,20 +14,32 @@ namespace DB
|
|||||||
/// Object metadata: path, size, path_key_for_cache.
|
/// Object metadata: path, size, path_key_for_cache.
|
||||||
struct StoredObject
|
struct StoredObject
|
||||||
{
|
{
|
||||||
std::string remote_path;
|
String remote_path; /// abs path
|
||||||
std::string local_path; /// or equivalent "metadata_path"
|
String local_path; /// or equivalent "metadata_path"
|
||||||
|
|
||||||
uint64_t bytes_size = 0;
|
uint64_t bytes_size = 0;
|
||||||
|
|
||||||
StoredObject() = default;
|
StoredObject() = default;
|
||||||
|
|
||||||
explicit StoredObject(
|
explicit StoredObject(String remote_path_)
|
||||||
const std::string & remote_path_,
|
: remote_path(std::move(remote_path_))
|
||||||
uint64_t bytes_size_ = 0,
|
{}
|
||||||
const std::string & local_path_ = "")
|
|
||||||
: remote_path(remote_path_)
|
StoredObject(
|
||||||
, local_path(local_path_)
|
String remote_path_,
|
||||||
, bytes_size(bytes_size_) {}
|
uint64_t bytes_size_)
|
||||||
|
: remote_path(std::move(remote_path_))
|
||||||
|
, bytes_size(bytes_size_)
|
||||||
|
{}
|
||||||
|
|
||||||
|
StoredObject(
|
||||||
|
String remote_path_,
|
||||||
|
uint64_t bytes_size_,
|
||||||
|
String local_path_)
|
||||||
|
: remote_path(std::move(remote_path_))
|
||||||
|
, local_path(std::move(local_path_))
|
||||||
|
, bytes_size(bytes_size_)
|
||||||
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
using StoredObjects = std::vector<StoredObject>;
|
using StoredObjects = std::vector<StoredObject>;
|
||||||
|
@ -28,7 +28,8 @@ MetadataTransactionPtr MetadataStorageFromStaticFilesWebServer::createTransactio
|
|||||||
|
|
||||||
const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
|
const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
|
||||||
{
|
{
|
||||||
return root_path;
|
static const String no_root;
|
||||||
|
return no_root;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
|
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
|
||||||
@ -96,7 +97,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
|
|||||||
for (const auto & [file_path, _] : object_storage.files)
|
for (const auto & [file_path, _] : object_storage.files)
|
||||||
{
|
{
|
||||||
if (file_path.starts_with(path))
|
if (file_path.starts_with(path))
|
||||||
result.push_back(file_path);
|
result.push_back(file_path); /// It looks more like recursive listing, not sure it is right
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -16,12 +16,9 @@ private:
|
|||||||
using FileType = WebObjectStorage::FileType;
|
using FileType = WebObjectStorage::FileType;
|
||||||
|
|
||||||
const WebObjectStorage & object_storage;
|
const WebObjectStorage & object_storage;
|
||||||
std::string root_path;
|
|
||||||
|
|
||||||
void assertExists(const std::string & path) const;
|
void assertExists(const std::string & path) const;
|
||||||
|
|
||||||
void initializeImpl(const String & uri_path, const std::unique_lock<std::shared_mutex> &) const;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);
|
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);
|
||||||
|
|
||||||
@ -43,8 +40,6 @@ public:
|
|||||||
|
|
||||||
StoredObjects getStorageObjects(const std::string & path) const override;
|
StoredObjects getStorageObjects(const std::string & path) const override;
|
||||||
|
|
||||||
std::string getObjectStorageRootPath() const override { return ""; }
|
|
||||||
|
|
||||||
struct stat stat(const String & /* path */) const override { return {}; }
|
struct stat stat(const String & /* path */) const override { return {}; }
|
||||||
|
|
||||||
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
||||||
@ -80,7 +75,7 @@ public:
|
|||||||
/// No metadata, no need to create anything.
|
/// No metadata, no need to create anything.
|
||||||
}
|
}
|
||||||
|
|
||||||
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
|
void createMetadataFile(const std::string & /* path */, ObjectStorageKey /* object_key */, uint64_t /* size_in_bytes */) override
|
||||||
{
|
{
|
||||||
/// Noop
|
/// Noop
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,10 @@ public:
|
|||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
ContextPtr context) override;
|
ContextPtr context) override;
|
||||||
|
|
||||||
std::string generateBlobNameForPath(const std::string & path) override { return path; }
|
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override
|
||||||
|
{
|
||||||
|
return ObjectStorageKey::createAsRelativeAnyway(path);
|
||||||
|
}
|
||||||
|
|
||||||
bool isRemote() const override { return true; }
|
bool isRemote() const override { return true; }
|
||||||
|
|
||||||
|
@ -843,8 +843,10 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
|
|||||||
/// We don't have to list bucket, because there is no asterisks.
|
/// We don't have to list bucket, because there is no asterisks.
|
||||||
if (key_prefix.size() == blob_path_with_globs.size())
|
if (key_prefix.size() == blob_path_with_globs.size())
|
||||||
{
|
{
|
||||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(blob_path_with_globs);
|
auto object_metadata = object_storage->getObjectMetadata(blob_path_with_globs);
|
||||||
blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata);
|
blobs_with_metadata.emplace_back(
|
||||||
|
blob_path_with_globs,
|
||||||
|
object_metadata);
|
||||||
if (outer_blobs)
|
if (outer_blobs)
|
||||||
outer_blobs->emplace_back(blobs_with_metadata.back());
|
outer_blobs->emplace_back(blobs_with_metadata.back());
|
||||||
if (file_progress_callback)
|
if (file_progress_callback)
|
||||||
@ -923,10 +925,12 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
|
|||||||
blobs_with_metadata = std::move(new_batch);
|
blobs_with_metadata = std::move(new_batch);
|
||||||
if (file_progress_callback)
|
if (file_progress_callback)
|
||||||
{
|
{
|
||||||
for (const auto & [_, info] : blobs_with_metadata)
|
for (const auto & [relative_path, info] : blobs_with_metadata)
|
||||||
|
{
|
||||||
file_progress_callback(FileProgress(0, info.size_bytes));
|
file_progress_callback(FileProgress(0, info.size_bytes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
size_t current_index = index++;
|
size_t current_index = index++;
|
||||||
if (current_index >= blobs_with_metadata.size())
|
if (current_index >= blobs_with_metadata.size())
|
||||||
@ -970,7 +974,7 @@ StorageAzureBlobSource::KeysIterator::KeysIterator(
|
|||||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
|
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
|
||||||
if (file_progress_callback)
|
if (file_progress_callback)
|
||||||
file_progress_callback(FileProgress(0, object_metadata.size_bytes));
|
file_progress_callback(FileProgress(0, object_metadata.size_bytes));
|
||||||
keys.emplace_back(RelativePathWithMetadata{key, object_metadata});
|
keys.emplace_back(key, object_metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (outer_blobs)
|
if (outer_blobs)
|
||||||
@ -1114,7 +1118,8 @@ StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader()
|
|||||||
QueryPipelineBuilder builder;
|
QueryPipelineBuilder builder;
|
||||||
std::shared_ptr<ISource> source;
|
std::shared_ptr<ISource> source;
|
||||||
std::unique_ptr<ReadBuffer> read_buf;
|
std::unique_ptr<ReadBuffer> read_buf;
|
||||||
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(path_with_metadata) : std::nullopt;
|
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files
|
||||||
|
? tryGetNumRowsFromCache(path_with_metadata) : std::nullopt;
|
||||||
if (num_rows_from_cache)
|
if (num_rows_from_cache)
|
||||||
{
|
{
|
||||||
/// We should not return single chunk with all number of rows,
|
/// We should not return single chunk with all number of rows,
|
||||||
|
@ -65,7 +65,7 @@ Pipe StorageSystemRemoteDataPaths::read(
|
|||||||
if (disk->supportsCache())
|
if (disk->supportsCache())
|
||||||
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache;
|
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache;
|
||||||
|
|
||||||
for (const auto & [local_path, common_prefox_for_objects, storage_objects] : remote_paths_by_local_path)
|
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
|
||||||
{
|
{
|
||||||
for (const auto & object : storage_objects)
|
for (const auto & object : storage_objects)
|
||||||
{
|
{
|
||||||
@ -78,7 +78,9 @@ Pipe StorageSystemRemoteDataPaths::read(
|
|||||||
col_local_path->insert(local_path);
|
col_local_path->insert(local_path);
|
||||||
col_remote_path->insert(object.remote_path);
|
col_remote_path->insert(object.remote_path);
|
||||||
col_size->insert(object.bytes_size);
|
col_size->insert(object.bytes_size);
|
||||||
col_namespace->insert(common_prefox_for_objects);
|
|
||||||
|
col_namespace->insertDefault();
|
||||||
|
//col_namespace->insert(common_prefox_for_objects);
|
||||||
|
|
||||||
if (cache)
|
if (cache)
|
||||||
{
|
{
|
||||||
|
@ -0,0 +1,10 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
|
||||||
|
<clickhouse>
|
||||||
|
<profiles>
|
||||||
|
<default>
|
||||||
|
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
|
||||||
|
<enable_s3_requests_logging>1</enable_s3_requests_logging>
|
||||||
|
</default>
|
||||||
|
</profiles>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,11 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
|
||||||
|
<clickhouse>
|
||||||
|
<profiles>
|
||||||
|
<default>
|
||||||
|
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
|
||||||
|
<enable_s3_requests_logging>1</enable_s3_requests_logging>
|
||||||
|
<storage_metadata_write_full_object_key>1</storage_metadata_write_full_object_key>
|
||||||
|
</default>
|
||||||
|
</profiles>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,47 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
|
||||||
|
<clickhouse>
|
||||||
|
<logger>
|
||||||
|
<level>test</level>
|
||||||
|
</logger>
|
||||||
|
|
||||||
|
<storage_configuration>
|
||||||
|
<disks>
|
||||||
|
<s3>
|
||||||
|
<type>s3</type>
|
||||||
|
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||||
|
<access_key_id>minio</access_key_id>
|
||||||
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
</s3>
|
||||||
|
<s3_plain>
|
||||||
|
<type>s3_plain</type>
|
||||||
|
<endpoint>http://minio1:9001/root/data/s3_pain_key_prefix</endpoint>
|
||||||
|
<access_key_id>minio</access_key_id>
|
||||||
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
<s3_allow_native_copy>true</s3_allow_native_copy>
|
||||||
|
</s3_plain>
|
||||||
|
</disks>
|
||||||
|
|
||||||
|
<policies>
|
||||||
|
<s3>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>s3</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</s3>
|
||||||
|
|
||||||
|
<s3_plain>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>s3_plain</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</s3_plain>
|
||||||
|
</policies>
|
||||||
|
</storage_configuration>
|
||||||
|
|
||||||
|
<merge_tree>
|
||||||
|
<storage_policy>s3</storage_policy>
|
||||||
|
</merge_tree>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,296 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import os
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def cluster():
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
cluster.add_instance(
|
||||||
|
"node",
|
||||||
|
main_configs=[
|
||||||
|
"configs/storage_conf.xml",
|
||||||
|
],
|
||||||
|
user_configs=[
|
||||||
|
"configs/settings.xml",
|
||||||
|
],
|
||||||
|
with_minio=True,
|
||||||
|
macros={"replica": "1"},
|
||||||
|
with_zookeeper=True,
|
||||||
|
)
|
||||||
|
cluster.add_instance(
|
||||||
|
"new_node",
|
||||||
|
main_configs=[
|
||||||
|
"configs/storage_conf.xml",
|
||||||
|
],
|
||||||
|
user_configs=[
|
||||||
|
"configs/settings_new.xml",
|
||||||
|
],
|
||||||
|
with_minio=True,
|
||||||
|
macros={"replica": "2"},
|
||||||
|
with_zookeeper=True,
|
||||||
|
)
|
||||||
|
cluster.add_instance(
|
||||||
|
"switching_node",
|
||||||
|
main_configs=[
|
||||||
|
"configs/storage_conf.xml",
|
||||||
|
],
|
||||||
|
user_configs=[
|
||||||
|
"configs/settings.xml",
|
||||||
|
],
|
||||||
|
with_minio=True,
|
||||||
|
with_zookeeper=True,
|
||||||
|
stay_alive=True,
|
||||||
|
)
|
||||||
|
logging.info("Starting cluster...")
|
||||||
|
cluster.start()
|
||||||
|
logging.info("Cluster started")
|
||||||
|
|
||||||
|
yield cluster
|
||||||
|
|
||||||
|
# Actually, try/finally section is excess in pytest.fixtures
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def get_part_path(node, table, part_name):
|
||||||
|
part_path = node.query(
|
||||||
|
f"SELECT path FROM system.parts WHERE table = '{table}' and name = '{part_name}'"
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
return os.path.normpath(part_path)
|
||||||
|
|
||||||
|
|
||||||
|
def get_first_part_name(node, table):
|
||||||
|
part_name = node.query(
|
||||||
|
f"SELECT name FROM system.parts WHERE table = '{table}' and active LIMIT 1"
|
||||||
|
).strip()
|
||||||
|
return part_name
|
||||||
|
|
||||||
|
|
||||||
|
def read_file(node, file_path):
|
||||||
|
return node.exec_in_container(["bash", "-c", f"cat {file_path}"])
|
||||||
|
|
||||||
|
|
||||||
|
def write_file(node, file_path, data):
|
||||||
|
node.exec_in_container(["bash", "-c", f"echo '{data}' > {file_path}"])
|
||||||
|
|
||||||
|
|
||||||
|
def find_keys_for_local_path(node, local_path):
|
||||||
|
remote = node.query(
|
||||||
|
f"""
|
||||||
|
SELECT
|
||||||
|
remote_path
|
||||||
|
FROM
|
||||||
|
system.remote_data_paths
|
||||||
|
WHERE
|
||||||
|
concat(path, local_path) = '{local_path}'
|
||||||
|
"""
|
||||||
|
).split("\n")
|
||||||
|
return [x for x in remote if x]
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_new_format(cluster):
|
||||||
|
node = cluster.instances["node"]
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE test_read_new_format (
|
||||||
|
id Int64,
|
||||||
|
data String
|
||||||
|
) ENGINE=MergeTree()
|
||||||
|
ORDER BY id
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO test_read_new_format VALUES (1, 'Hello')")
|
||||||
|
|
||||||
|
part_name = get_first_part_name(node, "test_read_new_format")
|
||||||
|
part_path = get_part_path(node, "test_read_new_format", part_name)
|
||||||
|
primary_idx = os.path.join(part_path, "primary.cidx")
|
||||||
|
|
||||||
|
remote = find_keys_for_local_path(node, primary_idx)
|
||||||
|
assert len(remote) == 1
|
||||||
|
remote = remote[0]
|
||||||
|
|
||||||
|
node.query(f"ALTER TABLE test_read_new_format DETACH PART '{part_name}'")
|
||||||
|
|
||||||
|
detached_primary_idx = os.path.join(
|
||||||
|
os.path.dirname(part_path), "detached", part_name, "primary.cidx"
|
||||||
|
)
|
||||||
|
|
||||||
|
# manually change the metadata format and see that CH reads it correctly
|
||||||
|
meta_data = read_file(node, detached_primary_idx)
|
||||||
|
lines = meta_data.split("\n")
|
||||||
|
object_size, object_key = lines[2].split("\t")
|
||||||
|
assert remote.endswith(object_key), object_key
|
||||||
|
assert remote != object_key
|
||||||
|
lines[2] = f"{object_size}\t{remote}"
|
||||||
|
lines[0] = "5"
|
||||||
|
|
||||||
|
write_file(node, detached_primary_idx, "\n".join(lines))
|
||||||
|
|
||||||
|
active_count = node.query(
|
||||||
|
f"SELECT count() FROM system.parts WHERE table = 'test_read_new_format' and active"
|
||||||
|
).strip()
|
||||||
|
assert active_count == "0", active_count
|
||||||
|
|
||||||
|
node.query(f"ALTER TABLE test_read_new_format ATTACH PART '{part_name}'")
|
||||||
|
|
||||||
|
active_count = node.query(
|
||||||
|
f"SELECT count() FROM system.parts WHERE table = 'test_read_new_format' and active"
|
||||||
|
).strip()
|
||||||
|
assert active_count == "1", active_count
|
||||||
|
|
||||||
|
values = node.query(f"SELECT * FROM test_read_new_format").split("\n")
|
||||||
|
values = [x for x in values if x]
|
||||||
|
assert values == ["1\tHello"], values
|
||||||
|
|
||||||
|
# part name has changed after attach
|
||||||
|
part_name = get_first_part_name(node, "test_read_new_format")
|
||||||
|
part_path = get_part_path(node, "test_read_new_format", part_name)
|
||||||
|
primary_idx = os.path.join(part_path, "primary.cidx")
|
||||||
|
|
||||||
|
new_remote = find_keys_for_local_path(node, primary_idx)
|
||||||
|
assert len(new_remote) == 1
|
||||||
|
new_remote = new_remote[0]
|
||||||
|
assert remote == new_remote
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_new_format(cluster):
|
||||||
|
node = cluster.instances["new_node"]
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE test_read_new_format (
|
||||||
|
id Int64,
|
||||||
|
data String
|
||||||
|
) ENGINE=MergeTree()
|
||||||
|
ORDER BY id
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO test_read_new_format VALUES (1, 'Hello')")
|
||||||
|
|
||||||
|
part_name = get_first_part_name(node, "test_read_new_format")
|
||||||
|
part_path = get_part_path(node, "test_read_new_format", part_name)
|
||||||
|
primary_idx = os.path.join(part_path, "primary.cidx")
|
||||||
|
|
||||||
|
remote = find_keys_for_local_path(node, primary_idx)
|
||||||
|
assert len(remote) == 1
|
||||||
|
remote = remote[0]
|
||||||
|
|
||||||
|
node.query(f"ALTER TABLE test_read_new_format DETACH PART '{part_name}'")
|
||||||
|
|
||||||
|
detached_primary_idx = os.path.join(
|
||||||
|
os.path.dirname(part_path), "detached", part_name, "primary.cidx"
|
||||||
|
)
|
||||||
|
|
||||||
|
# manually change the metadata format and see that CH reads it correctly
|
||||||
|
meta_data = read_file(node, detached_primary_idx)
|
||||||
|
lines = meta_data.split("\n")
|
||||||
|
object_size, object_key = lines[2].split("\t")
|
||||||
|
assert remote.endswith(object_key), object_key
|
||||||
|
assert remote == object_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("storage_policy", ["s3", "s3_plain"])
|
||||||
|
def test_replicated_merge_tree(cluster, storage_policy):
|
||||||
|
if storage_policy == "s3_plain":
|
||||||
|
# MergeTree table doesn't work on s3_plain. Rename operation is not implemented
|
||||||
|
return
|
||||||
|
|
||||||
|
node_old = cluster.instances["node"]
|
||||||
|
node_new = cluster.instances["new_node"]
|
||||||
|
|
||||||
|
create_table_statement = f"""
|
||||||
|
CREATE TABLE test_replicated_merge_tree (
|
||||||
|
id Int64,
|
||||||
|
val String
|
||||||
|
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_replicated_merge_tree_{storage_policy}', '{{replica}}')
|
||||||
|
PARTITION BY id
|
||||||
|
ORDER BY (id, val)
|
||||||
|
SETTINGS
|
||||||
|
storage_policy='{storage_policy}'
|
||||||
|
"""
|
||||||
|
|
||||||
|
node_old.query(create_table_statement)
|
||||||
|
node_new.query(create_table_statement)
|
||||||
|
|
||||||
|
node_old.query("INSERT INTO test_replicated_merge_tree VALUES (0, 'a')")
|
||||||
|
node_new.query("INSERT INTO test_replicated_merge_tree VALUES (1, 'b')")
|
||||||
|
|
||||||
|
# node_old have to fetch metadata from node_new and vice versa
|
||||||
|
node_old.query("SYSTEM SYNC REPLICA test_replicated_merge_tree")
|
||||||
|
node_new.query("SYSTEM SYNC REPLICA test_replicated_merge_tree")
|
||||||
|
|
||||||
|
count_old = node_old.query("SELECT count() FROM test_replicated_merge_tree").strip()
|
||||||
|
count_new = node_new.query("SELECT count() FROM test_replicated_merge_tree").strip()
|
||||||
|
|
||||||
|
assert count_old == "2"
|
||||||
|
assert count_new == "2"
|
||||||
|
|
||||||
|
node_old.query("DROP TABLE test_replicated_merge_tree SYNC")
|
||||||
|
node_new.query("DROP TABLE test_replicated_merge_tree SYNC")
|
||||||
|
|
||||||
|
|
||||||
|
def switch_config_write_full_object_key(node, enable):
|
||||||
|
setting_path = "/etc/clickhouse-server/users.d/settings.xml"
|
||||||
|
data = read_file(node, setting_path)
|
||||||
|
|
||||||
|
assert data != ""
|
||||||
|
|
||||||
|
is_on = "<storage_metadata_write_full_object_key>1</storage_metadata_write_full_object_key>"
|
||||||
|
is_off = "<storage_metadata_write_full_object_key>0</storage_metadata_write_full_object_key>"
|
||||||
|
|
||||||
|
enable_line = is_off
|
||||||
|
if enable:
|
||||||
|
enable_line = is_on
|
||||||
|
|
||||||
|
if is_on in data:
|
||||||
|
data = data.replace(is_on, enable_line)
|
||||||
|
else:
|
||||||
|
data = data.replace(is_off, enable_line)
|
||||||
|
|
||||||
|
write_file(node, setting_path, data)
|
||||||
|
node.restart_clickhouse()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("storage_policy", ["s3", "s3_plain"])
|
||||||
|
def test_log_table(cluster, storage_policy):
|
||||||
|
if storage_policy == "s3_plain":
|
||||||
|
# Log table doesn't work on s3_plain. Rename operation is not implemented
|
||||||
|
return
|
||||||
|
|
||||||
|
node = cluster.instances["switching_node"]
|
||||||
|
|
||||||
|
create_table_statement = f"""
|
||||||
|
CREATE TABLE test_log_table (
|
||||||
|
id Int64,
|
||||||
|
val String
|
||||||
|
) ENGINE=Log
|
||||||
|
SETTINGS
|
||||||
|
storage_policy='{storage_policy}'
|
||||||
|
"""
|
||||||
|
|
||||||
|
node.query(create_table_statement)
|
||||||
|
|
||||||
|
node.query("INSERT INTO test_log_table VALUES (0, 'a')")
|
||||||
|
assert "1" == node.query("SELECT count() FROM test_log_table").strip()
|
||||||
|
|
||||||
|
switch_config_write_full_object_key(node, True)
|
||||||
|
node.query("INSERT INTO test_log_table VALUES (0, 'a')")
|
||||||
|
assert "2" == node.query("SELECT count() FROM test_log_table").strip()
|
||||||
|
|
||||||
|
switch_config_write_full_object_key(node, False)
|
||||||
|
node.query("INSERT INTO test_log_table VALUES (1, 'b')")
|
||||||
|
assert "3" == node.query("SELECT count() FROM test_log_table").strip()
|
||||||
|
|
||||||
|
switch_config_write_full_object_key(node, True)
|
||||||
|
node.query("INSERT INTO test_log_table VALUES (2, 'c')")
|
||||||
|
assert "4" == node.query("SELECT count() FROM test_log_table").strip()
|
||||||
|
|
||||||
|
node.query("DROP TABLE test_log_table SYNC")
|
Loading…
Reference in New Issue
Block a user