change LRUCache to LRUResourceCache

This commit is contained in:
lgbo-ustc 2022-01-04 15:16:24 +08:00 committed by liangjiabiao
parent 266ca90575
commit 45c86757bf
8 changed files with 120 additions and 267 deletions

View File

@ -6,11 +6,13 @@
#include <chrono>
#include <mutex>
#include <atomic>
#include <base/logger_useful.h>
namespace DB
{
template <typename T>
struct TrivialWeightFunction
{
@ -20,43 +22,19 @@ struct TrivialWeightFunction
}
};
template <typename T>
struct TrivialLRUCacheEvictPolicy
{
inline bool canRelease(std::shared_ptr<T>) const
{
return true;
}
inline void release(std::shared_ptr<T>)
{
}
};
/// Thread-safe cache that evicts entries which are not used for a long time.
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Value weight should not change after insertion.
template <typename TKey,
typename TMapped,
typename HashFunction = std::hash<TKey>,
typename WeightFunction = TrivialWeightFunction<TMapped>,
typename EvictPolicy = TrivialLRUCacheEvictPolicy<TMapped>>
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class LRUCache
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
struct Result
{
MappedPtr value;
bool cache_miss = true;
// set_successful is not trustworthy for getOrSet, because removeOverflow is called right after putting key in cache
bool set_successful = false;
};
/** Initialize LRUCache with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions.
@ -82,27 +60,10 @@ public:
void set(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
setImpl(key, mapped, lock);
}
/**
* trySet() will fail (return false) if there is no space left and no keys could be evicted.
* Eviction permission of each key is defined by EvictPolicy. In default policy there is no restriction.
*/
bool trySet(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
return setImpl(key, mapped, lock);
}
template <typename LoadFunc>
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{
auto result = getOrTrySet(key, std::move(load_func));
return std::make_pair(result.value, result.cache_miss);
}
void remove(const Key & key)
{
std::lock_guard lock(mutex);
@ -117,13 +78,14 @@ public:
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
/// produce it, saves the result in the cache and returns it.
/// Only one of several concurrent threads calling getOrTrySet() will call load_func(),
/// Only one of several concurrent threads calling getOrSet() will call load_func(),
/// others will wait for that call to complete and will use its result (this helps prevent cache stampede).
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc.
///
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
template <typename LoadFunc>
Result getOrTrySet(const Key &key, LoadFunc && load_func)
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{
InsertTokenHolder token_holder;
{
@ -133,7 +95,7 @@ public:
if (val)
{
++hits;
return {val, false, false};
return std::make_pair(val, false);
}
auto & token = insert_tokens[key];
@ -153,7 +115,7 @@ public:
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
return {token->value, false, false};
return std::make_pair(token->value, false);
}
++misses;
@ -163,39 +125,18 @@ public:
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
bool is_value_loaded = false;
bool is_value_updated = false;
bool result = false;
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
// setImpl() may fail, but the final behavior seems not be affected
// next call of getOrTrySet() will still call load_func()
is_value_updated = setImpl(key, token->value, cache_lock);
is_value_loaded = true;
setImpl(key, token->value, cache_lock);
result = true;
}
if (!token->cleaned_up)
token_holder.cleanup(token_lock, cache_lock);
return {token->value, is_value_loaded, is_value_updated};
}
/// If key is not in cache or the element can be released, return is true. otherwise, return is false
bool tryRemove(const Key & key)
{
std::lock_guard loc(mutex);
auto it = cells.find(key);
if (it == cells.end())
return true;
auto & cell = it->second;
if (!evict_policy.canRelease(cell.value))
return false;
evict_policy.release(cell.value);
current_size -= cell.size;
cells.erase(it);
queue.erase(cell.queue_iterator);
return true;
return std::make_pair(token->value, result);
}
void getStats(size_t & out_hits, size_t & out_misses) const
@ -330,7 +271,6 @@ private:
std::atomic<size_t> misses {0};
WeightFunction weight_function;
EvictPolicy evict_policy;
MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
@ -348,7 +288,7 @@ private:
return cell.value;
}
bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
void setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
@ -358,14 +298,6 @@ private:
if (inserted)
{
auto value_weight = mapped ? weight_function(*mapped) : 0;
if (!removeOverflow(value_weight))
{
// cannot find enough space to put in the new value
cells.erase(it);
return false;
}
try
{
cell.queue_iterator = queue.insert(queue.end(), key);
@ -378,13 +310,6 @@ private:
}
else
{
if (!evict_policy.canRelease(cell.value))
{
// the old value is referred by someone, cannot release now
// in default policy, it is always true.
return false;
}
evict_policy.release(cell.value); // release the old value. this action is empty in default policy.
current_size -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
@ -393,18 +318,17 @@ private:
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
return true;
removeOverflow();
}
bool removeOverflow(size_t required_size_to_remove = 0)
void removeOverflow()
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();
auto key_it = queue.begin();
auto is_overflow = [&] { return (current_size + required_size_to_remove > max_size || (max_elements_size != 0 && queue_size > max_elements_size)); };
while (is_overflow() && (queue_size > 1) && (key_it != queue.end()))
while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1))
{
const Key & key = *key_it;
const Key & key = queue.front();
auto it = cells.find(key);
if (it == cells.end())
@ -414,23 +338,13 @@ private:
}
const auto & cell = it->second;
if (evict_policy.canRelease(cell.value))// in default, it is true
{
// always call release() before erasing an element
// in default, it's an empty action
evict_policy.release(cell.value);
current_size -= cell.size;
current_weight_lost += cell.size;
current_size -= cell.size;
current_weight_lost += cell.size;
cells.erase(it);
key_it = queue.erase(key_it);
--queue_size;
}
else
{
key_it++;
}
cells.erase(it);
queue.pop_front();
--queue_size;
}
onRemoveOverflowWeightLoss(current_weight_lost);
@ -440,7 +354,6 @@ private:
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
return !is_overflow();
}
/// Override this method if you want to track how much weight was lost in removeOverflow method.

View File

@ -324,7 +324,7 @@ private:
{
auto weight = value ? weight_function(*value) : 0;
auto queue_size = cells.size() + 1;
auto loss_weight = 0;
size_t loss_weight = 0;
auto is_overflow = [&] {
return current_weight + weight - loss_weight > max_weight || (max_element_size != 0 && queue_size > max_element_size);
};

View File

@ -1,23 +1,23 @@
#include <functional>
#include <memory>
#include <unistd.h>
#include <functional>
#include <Core/BackgroundSchedulePool.h>
#include <Poco/Logger.h>
#include <IO/WriteHelpers.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/RemoteFileMetadataFactory.h>
#include <base/errnoToString.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
#include <base/errnoToString.h>
#include <Poco/Logger.h>
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <Common/Exception.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/RemoteFileMetadataFactory.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event ExternalDataSourceLocalCacheReadBytes;
extern const Event ExternalDataSourceLocalCacheReadBytes;
}
namespace DB
{
@ -27,31 +27,26 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
LocalFileHolder::LocalFileHolder(std::shared_ptr<RemoteCacheController> cache_controller):file_cache_controller(cache_controller)
LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller) : file_cache_controller(std::move(cache_controller))
{
file_buffer = file_cache_controller->allocFile();
file_buffer = file_cache_controller->value().allocFile();
if (!file_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}",
file_cache_controller->getLocalPath().string());
}
LocalFileHolder::~LocalFileHolder()
{
if (file_cache_controller)
file_cache_controller->deallocFile(std::move(file_buffer));
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}", file_cache_controller->value().getLocalPath().string());
}
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBufferWithSize>(buff_size)
{
}
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size)
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size)
{
auto remote_path = remote_file_metadata->remote_path;
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
std::tie(remote_read_buffer->local_file_holder, read_buffer) = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer);
std::tie(remote_read_buffer->local_file_holder, read_buffer)
= ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer);
if (remote_read_buffer->local_file_holder == nullptr)
return read_buffer;
remote_read_buffer->remote_file_size = remote_file_metadata->file_size;
@ -62,14 +57,15 @@ bool RemoteReadBuffer::nextImpl()
{
auto start_offset = local_file_holder->file_buffer->getPosition();
auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size();
local_file_holder->file_cache_controller->waitMoreData(start_offset, end_offset);
local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset);
auto status = local_file_holder->file_buffer->next();
if (status)
{
BufferBase::set(local_file_holder->file_buffer->buffer().begin(),
local_file_holder->file_buffer->buffer().size(),
local_file_holder->file_buffer->offset());
BufferBase::set(
local_file_holder->file_buffer->buffer().begin(),
local_file_holder->file_buffer->buffer().size(),
local_file_holder->file_buffer->offset());
ProfileEvents::increment(ProfileEvents::ExternalDataSourceLocalCacheReadBytes, local_file_holder->file_buffer->available());
}
return status;
@ -84,11 +80,9 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence)
* position Y (Y > X), it would fail.
*/
auto & file_buffer = local_file_holder->file_buffer;
local_file_holder->file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size());
local_file_holder->file_cache_controller->value().waitMoreData(offset, offset + file_buffer->internalBuffer().size());
auto ret = file_buffer->seek(offset, whence);
BufferBase::set(file_buffer->buffer().begin(),
file_buffer->buffer().size(),
file_buffer->offset());
BufferBase::set(file_buffer->buffer().begin(), file_buffer->buffer().size(), file_buffer->offset());
return ret;
}
@ -124,7 +118,8 @@ void ExternalDataSourceCache::recoverTask()
invalid_paths.emplace_back(path);
continue;
}
if (!lru_caches->trySet(path, cache_controller))
auto cache_load_func = [&] { return cache_controller; };
if (!lru_caches->getOrSet(path, cache_load_func))
{
invalid_paths.emplace_back(path);
}
@ -136,9 +131,7 @@ void ExternalDataSourceCache::recoverTask()
LOG_INFO(log, "Recovered from directory:{}", root_dir);
}
void ExternalDataSourceCache::initOnce(
ContextPtr context,
const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_)
void ExternalDataSourceCache::initOnce(ContextPtr context, const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_)
{
std::lock_guard lock(mutex);
if (isInitialized())
@ -149,7 +142,7 @@ void ExternalDataSourceCache::initOnce(
log, "Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}", root_dir_, limit_size_);
root_dir = root_dir_;
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
lru_caches = std::make_unique<CacheType>(limit_size_);
lru_caches = std::make_unique<RemoteFileCacheType>(limit_size_);
/// create if root_dir not exists
if (!fs::exists(fs::path(root_dir)))
@ -157,22 +150,21 @@ void ExternalDataSourceCache::initOnce(
fs::create_directories(fs::path(root_dir));
}
recover_task_holder = context->getSchedulePool().createTask("recover local cache metadata for remote files", [this]{ recoverTask(); });
recover_task_holder = context->getSchedulePool().createTask("recover local cache metadata for remote files", [this] { recoverTask(); });
recover_task_holder->activateAndSchedule();
}
String ExternalDataSourceCache::calculateLocalPath(IRemoteFileMetadataPtr metadata) const
{
// add version into the full_path, and not block to read the new version
String full_path = metadata->getName() + ":" + metadata->remote_path
+ ":" + metadata->getVersion();
String full_path = metadata->getName() + ":" + metadata->remote_path + ":" + metadata->getVersion();
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
String hashcode_str = getHexUIntLowercase(hashcode);
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
}
std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>>
ExternalDataSourceCache::createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer)
std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>> ExternalDataSourceCache::createReader(
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer)
{
// If something is wrong on startup, rollback to read from the original ReadBuffer
if (!isInitialized())
@ -189,36 +181,43 @@ ExternalDataSourceCache::createReader(ContextPtr context, IRemoteFileMetadataPtr
if (cache)
{
// the remote file has been updated, need to redownload
if (!cache->isValid() || cache->isModified(remote_file_metadata))
if (!cache->value().isValid() || cache->value().isModified(remote_file_metadata))
{
LOG_TRACE(
log,
"Remote file ({}) has been updated. Last saved modification time: {}, actual last modification time: {}",
remote_path,
std::to_string(cache->getLastModificationTimestamp()),
std::to_string(cache->value().getLastModificationTimestamp()),
std::to_string(last_modification_timestamp));
cache->markInvalid();
cache->value().markInvalid();
cache.reset();
}
else
{
return {std::make_unique<LocalFileHolder>(cache), nullptr};
return {std::make_unique<LocalFileHolder>(std::move(cache)), nullptr};
}
}
if (!fs::exists(local_path))
fs::create_directories(local_path);
// cache is not found or is invalid
auto new_cache = std::make_shared<RemoteCacheController>(remote_file_metadata, local_path, local_cache_bytes_read_before_flush);
if (!lru_caches->trySet(local_path, new_cache))
// cache is not found or is invalid, try to remove it at first
lru_caches->tryRemove(local_path);
auto new_cache_controller
= std::make_shared<RemoteCacheController>(remote_file_metadata, local_path, local_cache_bytes_read_before_flush);
auto new_cache = lru_caches->getOrSet(local_path, [&] { return new_cache_controller; });
if (!new_cache)
{
LOG_ERROR(log, "Insert the new cache failed. new file size:{}, current total size:{}",
remote_file_metadata->file_size,
lru_caches->weight());
LOG_ERROR(
log,
"Insert the new cache failed. new file size:{}, current total size:{}",
remote_file_metadata->file_size,
lru_caches->weight());
return {nullptr, std::move(read_buffer)};
}
new_cache->startBackgroundDownload(std::move(read_buffer), context->getSchedulePool());
return {std::make_unique<LocalFileHolder>(new_cache), nullptr};
new_cache->value().startBackgroundDownload(std::move(read_buffer), context->getSchedulePool());
return {std::make_unique<LocalFileHolder>(std::move(new_cache)), nullptr};
}
}

View File

@ -1,40 +1,42 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <filesystem>
#include <list>
#include <set>
#include <map>
#include <memory>
#include <filesystem>
#include <mutex>
#include <set>
#include <Core/BackgroundSchedulePool.h>
#include <Poco/Logger.h>
#include <Common/LRUCache.h>
#include <Common/ErrorCodes.h>
#include <Common/ThreadPool.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <condition_variable>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/createReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <boost/core/noncopyable.hpp>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <Storages/Cache/RemoteCacheController.h>
#include <Storages/Cache/RemoteFileCachePolicy.h>
#include <boost/core/noncopyable.hpp>
#include <Poco/Logger.h>
#include <Common/ErrorCodes.h>
#include <Common/LRUResourceCache.h>
#include <Common/ThreadPool.h>
namespace DB
{
using RemoteFileCacheType = LRUResourceCache<String, RemoteCacheController, RemoteFileCacheWeightFunction>;
class LocalFileHolder
{
public:
explicit LocalFileHolder(std::shared_ptr<RemoteCacheController> cache_controller);
~LocalFileHolder();
explicit LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller);
~LocalFileHolder() = default;
std::shared_ptr<RemoteCacheController> file_cache_controller;
RemoteFileCacheType::MappedHolderPtr file_cache_controller;
std::unique_ptr<ReadBufferFromFileBase> file_buffer;
};
@ -55,11 +57,10 @@ private:
size_t remote_file_size = 0;
};
class ExternalDataSourceCache : private boost::noncopyable
{
public:
using CacheType = LRUCache<String, RemoteCacheController, std::hash<String>,
RemoteFileCacheWeightFunction, RemoteFileCacheEvictPolicy>;
~ExternalDataSourceCache();
// global instance
static ExternalDataSourceCache & instance();
@ -84,7 +85,7 @@ private:
std::atomic<bool> initialized = false;
std::atomic<size_t> total_size;
std::mutex mutex;
std::unique_ptr<CacheType> lru_caches;
std::unique_ptr<RemoteFileCacheType> lru_caches;
Poco::Logger * log = &Poco::Logger::get("ExternalDataSourceCache");

View File

@ -1,13 +1,13 @@
#include <Storages/Cache/RemoteCacheController.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/RemoteFileMetadataFactory.h>
#include <fstream>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadSettings.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/RemoteCacheController.h>
#include <Storages/Cache/RemoteFileMetadataFactory.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Parser.h>
#include <fstream>
namespace DB
{
@ -55,7 +55,11 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
readStringUntilEOF(metadata_content, file_readbuffer);
if (!cache_controller->file_metadata_ptr->fromString(metadata_content))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid metadata file({}) for meta class {}", local_path_.string(), cache_controller->metadata_class);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid metadata file({}) for meta class {}",
local_path_.string(),
cache_controller->metadata_class);
}
cache_controller->current_offset = fs::file_size(local_path_ / "data.bin");
@ -65,9 +69,7 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
}
RemoteCacheController::RemoteCacheController(
IRemoteFileMetadataPtr file_metadata_,
const std::filesystem::path & local_path_,
size_t cache_bytes_before_flush_)
IRemoteFileMetadataPtr file_metadata_, const std::filesystem::path & local_path_, size_t cache_bytes_before_flush_)
: file_metadata_ptr(file_metadata_)
, local_path(local_path_)
, valid(true)
@ -134,8 +136,7 @@ void RemoteCacheController::startBackgroundDownload(std::unique_ptr<ReadBuffer>
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path) / "data.bin").string());
flush(true);
ReadBufferPtr in_readbuffer(in_readbuffer_.release());
download_task_holder = thread_pool.createTask("download remote file",
[this, in_readbuffer]{ backgroundDownload(in_readbuffer); });
download_task_holder = thread_pool.createTask("download remote file", [this, in_readbuffer] { backgroundDownload(in_readbuffer); });
download_task_holder->activateAndSchedule();
}
@ -197,6 +198,7 @@ RemoteCacheController::~RemoteCacheController()
{
if (download_task_holder)
download_task_holder->deactivate();
close();
}
void RemoteCacheController::close()
@ -213,32 +215,7 @@ std::unique_ptr<ReadBufferFromFileBase> RemoteCacheController::allocFile()
//settings.local_fs_method = LocalFSReadMethod::read;
auto file_buffer = createReadBufferFromFileBase((local_path / "data.bin").string(), settings);
if (file_buffer)
{
std::lock_guard lock{mutex};
opened_file_buffer_refs.insert(reinterpret_cast<uintptr_t>(file_buffer.get()));
}
return file_buffer;
}
void RemoteCacheController::deallocFile(std::unique_ptr<ReadBufferFromFileBase> file_buffer)
{
if (!file_buffer)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Try to release a null file buffer for {}", local_path.string());
}
auto buffer_ref = reinterpret_cast<uintptr_t>(file_buffer.get());
std::lock_guard lock{mutex};
auto it = opened_file_buffer_refs.find(buffer_ref);
if (it == opened_file_buffer_refs.end())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
file_metadata_ptr->remote_path,
local_path.string());
}
opened_file_buffer_refs.erase(it);
}
}

View File

@ -1,17 +1,17 @@
#pragma once
#include <mutex>
#include <set>
#include <filesystem>
#include <map>
#include <memory>
#include <filesystem>
#include <mutex>
#include <set>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ErrorCodes.h>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Poco/Logger.h>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <base/logger_useful.h>
#include <Poco/Logger.h>
#include <Common/ErrorCodes.h>
namespace DB
{
@ -22,35 +22,21 @@ public:
{
TO_DOWNLOAD = 0,
DOWNLOADING = 1,
DOWNLOADED = 2,
DOWNLOADED = 2,
};
RemoteCacheController(
IRemoteFileMetadataPtr file_metadata_,
const std::filesystem::path & local_path_,
size_t cache_bytes_before_flush_);
IRemoteFileMetadataPtr file_metadata_, const std::filesystem::path & local_path_, size_t cache_bytes_before_flush_);
~RemoteCacheController();
// recover from local disk
static std::shared_ptr<RemoteCacheController>
recover(const std::filesystem::path & local_path);
static std::shared_ptr<RemoteCacheController> recover(const std::filesystem::path & local_path);
/**
* Called by LocalCachedFileReader, must be used in pair
* The second value of the return tuple is the local_path to store file.
*/
std::unique_ptr<ReadBufferFromFileBase> allocFile();
void deallocFile(std::unique_ptr<ReadBufferFromFileBase> buffer);
/**
* when allocFile be called, count++. deallocFile be called, count--.
* the local file could be deleted only count==0
*/
inline bool closable()
{
std::lock_guard lock{mutex};
return opened_file_buffer_refs.empty();
}
void close();
/**
@ -92,8 +78,6 @@ private:
std::mutex mutex;
std::condition_variable more_data_signal;
std::set<uintptr_t> opened_file_buffer_refs; // refer to a buffer address
String metadata_class;
LocalFileStatus file_status = TO_DOWNLOAD; // for tracking download process
IRemoteFileMetadataPtr file_metadata_ptr;

View File

@ -3,23 +3,7 @@ namespace DB
{
struct RemoteFileCacheWeightFunction
{
size_t operator()(const RemoteCacheController & cache) const
{
return cache.getFileSize();
}
};
struct RemoteFileCacheEvictPolicy
{
bool canRelease(std::shared_ptr<RemoteCacheController> cache) const
{
return !cache || cache->closable();
}
void release(std::shared_ptr<RemoteCacheController> cache)
{
if (cache)
cache->close();
}
size_t operator()(const RemoteCacheController & cache) const { return cache.getFileSize(); }
};
}

View File

@ -111,12 +111,7 @@ void HiveMetastoreClient::clearTableMetadata(const String & db_name, const Strin
std::lock_guard lock{mutex};
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (metadata)
{
if (!table_metadata_cache.tryRemove(cache_key))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Try to clear table metadata failed.");
}
}
table_metadata_cache.remove(cache_key);
}
void HiveMetastoreClient::setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_)