ClickHouse/src/Disks/DiskCacheWrapper.h
Azat Khuzhin 71a99ab27c Accept real file size in createReadBufferFromFileBase()
Right now streams relies on correct file size not the number of bytes
that will be read from the stream, to overcome one bug in the linux
kernel that may return EIINVAL for pread() with offset pass the EOF.

v2: Swap read_hint and file_size (since it is easy to miss something)

Before the first argument to readFile()/createReadBufferFromFileBase()
was read_hint, not the file_size, and let's preserve the order, since
it is easy to miss something

This will also fix 02051_read_settings test automatically because now
MergeTreeReaderStream will pass estimated_sum_mark_range_bytes to
read_hint not file_size, previously it cause on of the following errors:
- Attempt to read after EOF w/ O_DIRECT
- and LOGICAL_ERROR while adjusting granulas w/o O_DIRECT

This will also improve zero-length reads guard (via
ReadBufferFromEmptyFile), that had been added in #30190

v3: fix for other storages that wasn't enabled in fast-test
v4: ignore ENOENT/ENOTSUP in readFile
2022-01-04 10:53:17 +03:00

70 lines
2.7 KiB
C++

#pragma once
#include <unordered_map>
#include <base/logger_useful.h>
#include "DiskDecorator.h"
#include "DiskLocal.h"
namespace DB
{
struct FileDownloadMetadata;
/**
* Simple cache wrapper.
* Tries to cache files matched by predicate to given local disk (cache disk).
*
* When writeFile() is invoked wrapper firstly writes file to cache.
* After write buffer is finalized actual file is stored to underlying disk.
*
* When readFile() is invoked and file exists in cache wrapper reads this file from cache.
* If file doesn't exist wrapper downloads this file from underlying disk to cache.
* readFile() invocation is thread-safe.
*/
class DiskCacheWrapper : public DiskDecorator
{
public:
DiskCacheWrapper(
std::shared_ptr<IDisk> delegate_,
std::shared_ptr<DiskLocal> cache_disk_,
std::function<bool(const String &)> cache_file_predicate_);
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;
private:
std::shared_ptr<FileDownloadMetadata> acquireDownloadMetadata(const String & path) const;
/// Disk to cache files.
std::shared_ptr<DiskLocal> cache_disk;
/// Cache only files satisfies predicate.
const std::function<bool(const String &)> cache_file_predicate;
/// Contains information about currently running file downloads to cache.
mutable std::unordered_map<String, std::weak_ptr<FileDownloadMetadata>> file_downloads;
/// Protects concurrent downloading files to cache.
mutable std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("DiskCache");
};
}