Merge pull request #36079 from kssenii/change-hashed-path

Change hashed path for cache
This commit is contained in:
Kseniia Sumarokova 2022-04-20 10:27:44 +02:00 committed by GitHub
commit f37812f89a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 6 additions and 13 deletions

View File

@ -350,7 +350,7 @@ std::vector<String> IDiskRemote::getRemotePaths(const String & local_path) const
std::vector<String> remote_paths;
for (const auto & [remote_path, _] : metadata.remote_fs_objects)
remote_paths.push_back(remote_path);
remote_paths.push_back(fs::path(metadata.remote_fs_root_path) / remote_path);
return remote_paths;
}

View File

@ -36,6 +36,7 @@ namespace ErrorCodes
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t file_size)
{
current_path = path;
auto remote_path = fs::path(common_path_prefix) / path;
auto cache = settings.remote_fs_cache;
bool with_cache = cache
@ -45,14 +46,14 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
auto remote_file_reader_creator = [=, this]()
{
return std::make_unique<ReadBufferFromS3>(
client_ptr, bucket, fs::path(common_path_prefix) / path, max_single_read_retries,
client_ptr, bucket, remote_path, max_single_read_retries,
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
};
if (with_cache)
{
return std::make_shared<CachedReadBufferFromRemoteFS>(
path, cache, remote_file_reader_creator, settings, read_until_position ? read_until_position : file_size);
remote_path, cache, remote_file_reader_creator, settings, read_until_position ? read_until_position : file_size);
}
return remote_file_reader_creator();

View File

@ -242,7 +242,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
fs::path(remote_fs_root_path) / blob_name,
settings->s3_settings,
std::move(object_metadata),
buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), blob_name, cache_on_write ? cache : nullptr);
buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), cache_on_write ? cache : nullptr);
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{

View File

@ -35,7 +35,6 @@ const int S3_WARN_MAX_PARTS = 10000;
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int LOGICAL_ERROR;
}
struct WriteBufferFromS3::UploadPartTask
@ -63,7 +62,6 @@ WriteBufferFromS3::WriteBufferFromS3(
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
ScheduleFunc schedule_,
const String & blob_name_,
FileCachePtr cache_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
@ -73,7 +71,6 @@ WriteBufferFromS3::WriteBufferFromS3(
, upload_part_size(s3_settings_.min_upload_part_size)
, s3_settings(s3_settings_)
, schedule(std::move(schedule_))
, blob_name(blob_name_)
, cache(cache_)
{
allocateBuffer();
@ -97,10 +94,7 @@ void WriteBufferFromS3::nextImpl()
if (cacheEnabled())
{
if (blob_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty blob name");
auto cache_key = cache->hash(blob_name);
auto cache_key = cache->hash(key);
file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size));
current_download_offset += size;

View File

@ -55,7 +55,6 @@ public:
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ScheduleFunc schedule_ = {},
const String & blob_name = "",
FileCachePtr cache_ = nullptr);
~WriteBufferFromS3() override;
@ -119,7 +118,6 @@ private:
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
const String blob_name;
FileCachePtr cache;
size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder;