ClickHouse/src/Disks/DiskCacheWrapper.cpp

308 lines
9.2 KiB
C++
Raw Normal View History

2020-07-29 15:05:02 +00:00
#include "DiskCacheWrapper.h"
#include <IO/copyData.h>
2021-04-20 18:29:03 +00:00
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/WriteBufferFromFileDecorator.h>
2020-07-29 15:05:02 +00:00
#include <Common/quoteString.h>
2020-07-30 13:42:05 +00:00
#include <condition_variable>
2020-07-29 15:05:02 +00:00
namespace DB
{
2020-07-30 13:42:05 +00:00
/**
2021-04-20 18:29:03 +00:00
* Write buffer with possibility to set and invoke callback after 'finalize' call.
2020-07-30 13:42:05 +00:00
*/
2021-04-20 18:29:03 +00:00
class CompletionAwareWriteBuffer : public WriteBufferFromFileDecorator
2020-07-30 13:42:05 +00:00
{
public:
2021-04-20 18:29:03 +00:00
CompletionAwareWriteBuffer(std::unique_ptr<WriteBufferFromFileBase> impl_, std::function<void()> completion_callback_)
: WriteBufferFromFileDecorator(std::move(impl_)), completion_callback(completion_callback_) { }
2020-07-30 13:42:05 +00:00
virtual ~CompletionAwareWriteBuffer() override
{
try
{
2021-11-10 22:58:56 +00:00
finalize();
}
catch (...)
{
2021-04-21 10:34:07 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2021-11-10 22:58:56 +00:00
void finalizeImpl() override
2020-07-30 13:42:05 +00:00
{
2021-11-10 22:58:56 +00:00
WriteBufferFromFileDecorator::finalizeImpl();
2021-04-20 18:29:03 +00:00
completion_callback();
2020-07-30 13:42:05 +00:00
}
private:
2020-07-30 13:42:05 +00:00
const std::function<void()> completion_callback;
};
enum FileDownloadStatus
{
NONE,
DOWNLOADING,
DOWNLOADED,
ERROR
};
struct FileDownloadMetadata
{
/// Thread waits on this condition if download process is in progress.
std::condition_variable condition;
FileDownloadStatus status = NONE;
};
2020-07-29 15:05:02 +00:00
DiskCacheWrapper::DiskCacheWrapper(
std::shared_ptr<IDisk> delegate_, std::shared_ptr<DiskLocal> cache_disk_, std::function<bool(const String &)> cache_file_predicate_)
: DiskDecorator(delegate_), cache_disk(cache_disk_), cache_file_predicate(cache_file_predicate_)
{
}
std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(const String & path) const
{
std::unique_lock<std::mutex> lock{mutex};
auto it = file_downloads.find(path);
if (it != file_downloads.end() && !it->second.expired())
return it->second.lock();
2020-07-29 18:15:20 +00:00
std::shared_ptr<FileDownloadMetadata> metadata(
new FileDownloadMetadata,
[this, path] (FileDownloadMetadata * p)
{
std::unique_lock<std::mutex> erase_lock{mutex};
file_downloads.erase(path);
delete p;
});
2020-07-29 15:05:02 +00:00
file_downloads.emplace(path, metadata);
return metadata;
}
std::unique_ptr<ReadBufferFromFileBase>
DiskCacheWrapper::readFile(
const String & path,
2021-08-16 00:00:32 +00:00
const ReadSettings & settings,
std::optional<size_t> size) const
2020-07-29 15:05:02 +00:00
{
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, settings, size);
2020-07-29 18:15:20 +00:00
LOG_DEBUG(log, "Read file {} from cache", backQuote(path));
2020-07-29 15:05:02 +00:00
if (cache_disk->exists(path))
return cache_disk->readFile(path, settings, size);
2020-07-29 15:05:02 +00:00
auto metadata = acquireDownloadMetadata(path);
{
std::unique_lock<std::mutex> lock{mutex};
if (metadata->status == NONE)
{
/// This thread will responsible for file downloading to cache.
metadata->status = DOWNLOADING;
LOG_DEBUG(log, "File {} doesn't exist in cache. Will download it", backQuote(path));
2020-07-29 15:05:02 +00:00
}
else if (metadata->status == DOWNLOADING)
{
LOG_DEBUG(log, "Waiting for file {} download to cache", backQuote(path));
2020-07-29 15:05:02 +00:00
metadata->condition.wait(lock, [metadata] { return metadata->status == DOWNLOADED || metadata->status == ERROR; });
}
}
if (metadata->status == DOWNLOADING)
{
FileDownloadStatus result_status = DOWNLOADED;
if (!cache_disk->exists(path))
{
try
{
auto dir_path = directoryPath(path);
2020-07-30 13:42:05 +00:00
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
2020-07-29 15:05:02 +00:00
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, settings, size);
2021-08-16 00:00:32 +00:00
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite);
copyData(*src_buffer, *dst_buffer);
}
cache_disk->moveFile(tmp_path, path);
2020-07-29 15:05:02 +00:00
LOG_DEBUG(log, "File {} downloaded to cache", backQuote(path));
2020-07-29 15:05:02 +00:00
}
catch (...)
{
tryLogCurrentException("DiskCache", "Failed to download file + " + backQuote(path) + " to cache");
2020-07-29 15:05:02 +00:00
result_status = ERROR;
}
}
/// Notify all waiters that file download is finished.
std::unique_lock<std::mutex> lock{mutex};
metadata->status = result_status;
lock.unlock();
metadata->condition.notify_all();
}
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, settings, size);
2020-07-29 15:05:02 +00:00
return DiskDecorator::readFile(path, settings, size);
2020-07-29 15:05:02 +00:00
}
std::unique_ptr<WriteBufferFromFileBase>
2021-01-10 00:28:59 +00:00
DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode)
2020-07-29 15:05:02 +00:00
{
if (!cache_file_predicate(path))
2021-01-10 00:28:59 +00:00
return DiskDecorator::writeFile(path, buf_size, mode);
2020-07-29 15:05:02 +00:00
LOG_DEBUG(log, "Write file {} to cache", backQuote(path));
2020-07-30 13:42:05 +00:00
auto dir_path = directoryPath(path);
2020-07-30 13:42:05 +00:00
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
2020-07-29 15:05:02 +00:00
return std::make_unique<CompletionAwareWriteBuffer>(
2021-01-10 00:28:59 +00:00
cache_disk->writeFile(path, buf_size, mode),
[this, path, buf_size, mode]()
2020-07-30 15:48:32 +00:00
{
2020-07-29 15:05:02 +00:00
/// Copy file from cache to actual disk when cached buffer is finalized.
auto src_buffer = cache_disk->readFile(path, ReadSettings(), /* size= */ {});
2021-01-10 00:28:59 +00:00
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
2020-07-29 15:05:02 +00:00
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();
2021-04-20 18:29:03 +00:00
});
2020-07-29 15:05:02 +00:00
}
void DiskCacheWrapper::clearDirectory(const String & path)
2020-07-29 15:05:02 +00:00
{
if (cache_disk->exists(path))
cache_disk->clearDirectory(path);
DiskDecorator::clearDirectory(path);
2020-07-29 15:05:02 +00:00
}
void DiskCacheWrapper::moveDirectory(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
2021-03-23 10:33:07 +00:00
{
/// Destination directory may not be empty if previous directory move attempt was failed.
if (cache_disk->exists(to_path) && cache_disk->isDirectory(to_path))
cache_disk->clearDirectory(to_path);
2020-07-29 15:05:02 +00:00
cache_disk->moveDirectory(from_path, to_path);
2021-03-23 10:33:07 +00:00
}
2020-07-29 15:05:02 +00:00
DiskDecorator::moveDirectory(from_path, to_path);
}
void DiskCacheWrapper::moveFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
2020-07-30 14:49:56 +00:00
{
auto dir_path = directoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
2020-07-30 14:49:56 +00:00
2020-07-29 15:05:02 +00:00
cache_disk->moveFile(from_path, to_path);
2020-07-30 14:49:56 +00:00
}
2020-07-29 15:05:02 +00:00
DiskDecorator::moveFile(from_path, to_path);
}
void DiskCacheWrapper::replaceFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
2020-07-30 14:49:56 +00:00
{
auto dir_path = directoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
2020-07-30 14:49:56 +00:00
2020-07-29 15:05:02 +00:00
cache_disk->replaceFile(from_path, to_path);
2020-07-30 14:49:56 +00:00
}
2020-07-29 15:05:02 +00:00
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskCacheWrapper::removeFile(const String & path)
{
cache_disk->removeFileIfExists(path);
DiskDecorator::removeFile(path);
}
void DiskCacheWrapper::removeFileIfExists(const String & path)
{
cache_disk->removeFileIfExists(path);
DiskDecorator::removeFileIfExists(path);
}
void DiskCacheWrapper::removeDirectory(const String & path)
2020-07-29 15:05:02 +00:00
{
if (cache_disk->exists(path))
cache_disk->removeDirectory(path);
DiskDecorator::removeDirectory(path);
2020-07-29 15:05:02 +00:00
}
void DiskCacheWrapper::removeRecursive(const String & path)
2020-07-29 15:05:02 +00:00
{
if (cache_disk->exists(path))
cache_disk->removeRecursive(path);
DiskDecorator::removeRecursive(path);
}
2021-01-20 09:48:22 +00:00
void DiskCacheWrapper::removeSharedFile(const String & path, bool keep_s3)
{
if (cache_disk->exists(path))
2021-01-20 09:48:22 +00:00
cache_disk->removeSharedFile(path, keep_s3);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_s3)
{
if (cache_disk->exists(path))
cache_disk->removeSharedRecursive(path, keep_s3);
DiskDecorator::removeSharedRecursive(path, keep_s3);
2020-07-29 15:05:02 +00:00
}
void DiskCacheWrapper::createHardLink(const String & src_path, const String & dst_path)
{
/// Don't create hardlinks for cache files to shadow directory as it just waste cache disk space.
if (cache_disk->exists(src_path) && !dst_path.starts_with("shadow/"))
2020-07-30 14:49:56 +00:00
{
auto dir_path = directoryPath(dst_path);
2020-07-30 14:49:56 +00:00
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
2020-07-29 15:05:02 +00:00
cache_disk->createHardLink(src_path, dst_path);
2020-07-30 14:49:56 +00:00
}
2020-07-29 15:05:02 +00:00
DiskDecorator::createHardLink(src_path, dst_path);
}
2020-07-30 13:42:05 +00:00
void DiskCacheWrapper::createDirectory(const String & path)
{
cache_disk->createDirectory(path);
DiskDecorator::createDirectory(path);
}
void DiskCacheWrapper::createDirectories(const String & path)
{
cache_disk->createDirectories(path);
DiskDecorator::createDirectories(path);
}
2020-07-29 18:15:20 +00:00
ReservationPtr DiskCacheWrapper::reserve(UInt64 bytes)
{
auto ptr = DiskDecorator::reserve(bytes);
if (ptr)
{
auto disk_ptr = std::static_pointer_cast<DiskCacheWrapper>(shared_from_this());
return std::make_unique<ReservationDelegate>(std::move(ptr), disk_ptr);
}
return ptr;
}
2020-07-29 15:05:02 +00:00
}