mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Review fixes
This commit is contained in:
parent
2064934e59
commit
1e1e6d4fa0
@ -72,15 +72,13 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
|
||||
{
|
||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||
|
||||
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
|
||||
|
||||
/// FIXME Cache currently unsupported :(
|
||||
ObjectStoragePtr azure_object_storage = std::make_unique<AzureObjectStorage>(
|
||||
std::move(cache),
|
||||
nullptr,
|
||||
name,
|
||||
getAzureBlobContainerClient(config, config_prefix),
|
||||
getAzureBlobStorageSettings(config, config_prefix, context));
|
||||
|
||||
|
||||
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
|
||||
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
|
||||
|
||||
|
@ -60,7 +60,6 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
}
|
||||
|
||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT
|
||||
const std::string & path,
|
||||
WriteMode mode,
|
||||
|
@ -31,9 +31,8 @@ void registerDiskHDFS(DiskFactory & factory)
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
|
||||
context_->getSettingsRef().hdfs_replication
|
||||
);
|
||||
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context_);
|
||||
|
||||
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(std::move(cache), uri, std::move(settings), config);
|
||||
/// FIXME Cache currently unsupported :(
|
||||
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(nullptr, uri, std::move(settings), config);
|
||||
|
||||
auto metadata_disk = prepareForLocalMetadata(name, config, config_prefix, context_).second;
|
||||
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
|
||||
|
@ -72,14 +72,14 @@ public:
|
||||
/// at least size of object
|
||||
virtual ObjectMetadata getObjectMetadata(const std::string & path) const = 0;
|
||||
|
||||
/// Read single path from object storage, don't use cache
|
||||
/// Read single path from object storage
|
||||
virtual std::unique_ptr<SeekableReadBuffer> readObject( /// NOLINT
|
||||
const std::string & path,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const = 0;
|
||||
|
||||
/// Read multiple objects with common prefix, use cache
|
||||
/// Read multiple objects with common prefix
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
|
@ -139,7 +139,16 @@ std::unique_ptr<SeekableReadBuffer> S3ObjectStorage::readObject( /// NOLINT
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return std::make_unique<ReadBufferFromS3>(client.get(), bucket, path, version_id, settings_ptr->s3_settings.max_single_read_retries, read_settings);
|
||||
ReadSettings disk_read_settings{read_settings};
|
||||
if (cache)
|
||||
{
|
||||
if (IFileCache::isReadOnly())
|
||||
disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
|
||||
|
||||
disk_read_settings.remote_fs_cache = cache;
|
||||
}
|
||||
|
||||
return std::make_unique<ReadBufferFromS3>(client.get(), bucket, path, version_id, settings_ptr->s3_settings.max_single_read_retries, disk_read_settings);
|
||||
}
|
||||
|
||||
|
||||
@ -190,12 +199,13 @@ void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & chi
|
||||
|
||||
auto result = outcome.GetResult();
|
||||
auto objects = result.GetContents();
|
||||
for (const auto & object : objects)
|
||||
children.emplace_back(object.GetKey(), object.GetSize());
|
||||
|
||||
if (objects.empty())
|
||||
break;
|
||||
|
||||
for (const auto & object : objects)
|
||||
children.emplace_back(object.GetKey(), object.GetSize());
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
} while (outcome.GetResult().GetIsTruncated());
|
||||
}
|
||||
@ -249,7 +259,8 @@ void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = client_ptr->DeleteObjects(request);
|
||||
logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;});
|
||||
if (outcome.GetError().GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||
throwIfError(outcome);
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,7 +276,9 @@ void S3ObjectStorage::removeObjectIfExists(const std::string & path)
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
client_ptr->DeleteObjects(request);
|
||||
auto outcome = client_ptr->DeleteObjects(request);
|
||||
if (outcome.GetError().GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
||||
|
@ -24,10 +24,10 @@ public:
|
||||
size_t max_single_download_retries_,
|
||||
size_t tmp_buffer_size_,
|
||||
bool use_external_buffer_ = false,
|
||||
size_t read_until_position_ = 0
|
||||
);
|
||||
size_t read_until_position_ = 0);
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
off_t getPosition() override;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
Loading…
Reference in New Issue
Block a user