mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 18:02:24 +00:00
refector
1. Make the lru cache pilicy in RemoteReadBufferCache into Common/UnreleasableLRUCache.h 2. If try to use local cached file fail, the RemoteReadBuffer::create() return the original ReadBuffer 3. Remove LocalFileReader 4. RemoteCacheController does not hold a context now, and the download process starts in RemoteReadBufferCache once a new RemoteCacheController is create successly 5. Make an abstract class RemoteFileMetaDataBase for descripting a remote file meta data. 6. Remote file meta data need to a version, can cache multi-version for a file now
This commit is contained in:
parent
01940c3f01
commit
3c56a5deac
283
src/Common/UnreleasableLRUCache.h
Normal file
283
src/Common/UnreleasableLRUCache.h
Normal file
@ -0,0 +1,283 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename T>
|
||||
struct TrivialUnreleasableLRUCacheWeightFunction
|
||||
{
|
||||
size_t operator()(const T &) const
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
|
||||
enum class CacheEvictStatus
|
||||
{
|
||||
CAN_EVITCT, // a key can be evicted
|
||||
TERMINATE_EVICT, // stop the evicting process
|
||||
SKIP_EVICT, // skip current value and keep iterating
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct TrivialUnreleasableLRUCacheEvitPolicy
|
||||
{
|
||||
CacheEvictStatus canRelease(const T &)
|
||||
{
|
||||
return CacheEvictStatus::CAN_EVITCT;
|
||||
}
|
||||
|
||||
void release(T & )
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* Another version LRU Cache。
|
||||
* A value can only be evicted or be updated if it is releasable. If there is no releasable value,
|
||||
* insert or update will fail.
|
||||
*/
|
||||
template <typename TKey,
|
||||
typename TMapped,
|
||||
typename HashFunction = std::hash<TKey>,
|
||||
typename WeightFunction = TrivialUnreleasableLRUCacheWeightFunction<TMapped>,
|
||||
typename EvictPolicy = TrivialUnreleasableLRUCacheEvitPolicy<TMapped>>
|
||||
class UnreleasableLRUCache
|
||||
{
|
||||
public:
|
||||
using Key = TKey;
|
||||
using Mapped = TMapped;
|
||||
using MappedPtr = std::shared_ptr<Mapped>;
|
||||
|
||||
/** Initialize LRUCache with max_size and max_elements_size.
|
||||
* max_elements_size == 0 means no elements size restrictions.
|
||||
*/
|
||||
UnreleasableLRUCache(size_t max_size_, size_t max_elements_size_ = 0)
|
||||
: max_size(std::max(static_cast<size_t>(1), max_size_))
|
||||
, max_elements_size(max_elements_size_)
|
||||
{}
|
||||
|
||||
MappedPtr get(const Key & key)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto res = getImpl(key, lock);
|
||||
if (res)
|
||||
++hits;
|
||||
else
|
||||
++misses;
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* Fail on two cases
|
||||
* 1) the key exists, but the old value is not releasable
|
||||
* 2) the key not exists, but there is not enough space for it after trying to evict some least recently used values.
|
||||
*/
|
||||
bool set(const Key & key, const MappedPtr & mapped)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
return setImpl(key, mapped, lock);
|
||||
}
|
||||
|
||||
void getStats(size_t & out_hits, size_t & out_misses) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
out_hits = hits;
|
||||
out_misses = misses;
|
||||
}
|
||||
|
||||
size_t weight() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return current_size;
|
||||
}
|
||||
|
||||
size_t count() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return cells.size();
|
||||
}
|
||||
|
||||
size_t maxSize() const
|
||||
{
|
||||
return max_size;
|
||||
}
|
||||
|
||||
void reset()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
queue.clear();
|
||||
cells.clear();
|
||||
current_size = 0;
|
||||
hits = 0;
|
||||
misses = 0;
|
||||
}
|
||||
|
||||
virtual ~UnreleasableLRUCache() {}
|
||||
|
||||
protected:
|
||||
using LRUQueue = std::list<Key>;
|
||||
using LRUQueueIterator = typename LRUQueue::iterator;
|
||||
|
||||
struct Cell
|
||||
{
|
||||
MappedPtr value;
|
||||
size_t size;
|
||||
LRUQueueIterator queue_iterator;
|
||||
};
|
||||
|
||||
using Cells = std::unordered_map<Key, Cell, HashFunction>;
|
||||
|
||||
Cells cells;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
private:
|
||||
|
||||
LRUQueue queue;
|
||||
|
||||
/// Total weight of values.
|
||||
size_t current_size = 0;
|
||||
const size_t max_size;
|
||||
const size_t max_elements_size;
|
||||
|
||||
std::atomic<size_t> hits {0};
|
||||
std::atomic<size_t> misses {0};
|
||||
|
||||
WeightFunction weight_function;
|
||||
EvictPolicy evict_policy;
|
||||
|
||||
MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto it = cells.find(key);
|
||||
if (it == cells.end())
|
||||
{
|
||||
return MappedPtr();
|
||||
}
|
||||
|
||||
Cell & cell = it->second;
|
||||
|
||||
/// Move the key to the end of the queue. The iterator remains valid.
|
||||
queue.splice(queue.end(), queue, cell.queue_iterator);
|
||||
|
||||
return cell.value;
|
||||
}
|
||||
|
||||
bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto [it, inserted] = cells.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(key),
|
||||
std::forward_as_tuple());
|
||||
|
||||
Cell & cell = it->second;
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
auto weight = mapped ? weight_function(*mapped) : 0;
|
||||
if (!removeOverflow(weight))
|
||||
{
|
||||
// cannot insert this new value
|
||||
cells.erase(it);
|
||||
return false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
cell.queue_iterator = queue.insert(queue.end(), key);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
cells.erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (evict_policy.canRelease(*cell.value) != CacheEvictStatus::CAN_EVITCT)
|
||||
{
|
||||
// the old value is not releasable
|
||||
return false;
|
||||
}
|
||||
evict_policy.release(*cell.value);
|
||||
current_size -= cell.size;
|
||||
queue.splice(queue.end(), queue, cell.queue_iterator);
|
||||
}
|
||||
|
||||
cell.value = mapped;
|
||||
cell.size = cell.value ? weight_function(*cell.value) : 0;
|
||||
current_size += cell.size;
|
||||
|
||||
removeOverflow(0);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Or make your own implementation
|
||||
virtual bool removeOverflow(size_t more_size)
|
||||
{
|
||||
size_t current_weight_lost = 0;
|
||||
size_t queue_size = cells.size();
|
||||
|
||||
auto key_it = queue.begin();
|
||||
while ((current_size + more_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1)
|
||||
&& key_it != queue.end())
|
||||
{
|
||||
const Key & key = *key_it;
|
||||
|
||||
auto it = cells.find(key);
|
||||
if (it == cells.end())
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("UnreleasableLRUCache"), "UnreleasableLRUCache became inconsistent. There must be a bug in it.");
|
||||
abort();
|
||||
}
|
||||
|
||||
const auto & cell = it->second;
|
||||
auto cache_evict_status = evict_policy.canRelease(*(cell.value));
|
||||
if (cache_evict_status == CacheEvictStatus::CAN_EVITCT)
|
||||
{
|
||||
evict_policy.release(*(cell.value));
|
||||
current_size -= cell.size;
|
||||
current_weight_lost += cell.size;
|
||||
|
||||
cells.erase(it);
|
||||
key_it = queue.erase(key_it);
|
||||
--queue_size;
|
||||
}
|
||||
else if (cache_evict_status == CacheEvictStatus::SKIP_EVICT)
|
||||
{
|
||||
key_it++;
|
||||
continue;
|
||||
}
|
||||
else if (cache_evict_status == CacheEvictStatus::TERMINATE_EVICT)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
onRemoveOverflowWeightLoss(current_weight_lost);
|
||||
|
||||
if (current_size > (1ull << 63))
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("UnreleasableLRUCache"), "UnreleasableLRUCache became inconsistent. There must be a bug in it.");
|
||||
abort();
|
||||
}
|
||||
return !(current_size + more_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size));
|
||||
}
|
||||
|
||||
/// Override this method if you want to track how much weight was lost in removeOverflow method.
|
||||
virtual void onRemoveOverflowWeightLoss(size_t /*weight_loss*/) {}
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -24,22 +24,21 @@ public:
|
||||
{
|
||||
}
|
||||
virtual ~RemoteFileMetaDataBase();
|
||||
virtual String getClassName() = 0; //class name
|
||||
virtual String getClassName() const = 0; //class name
|
||||
// methods for basic information
|
||||
inline String getSchema() { return schema; }
|
||||
inline String getCluster() { return cluster; }
|
||||
inline size_t getFileSize() { return file_size; }
|
||||
inline String getRemotePath() { return remote_path; }
|
||||
inline UInt64 getLastModificationTimestamp() { return last_modification_timestamp; }
|
||||
// create a new object
|
||||
virtual std::shared_ptr<RemoteFileMetaDataBase> clone() = 0;
|
||||
inline String getSchema() const { return schema; }
|
||||
inline String getCluster() const { return cluster; }
|
||||
inline size_t getFileSize() const { return file_size; }
|
||||
inline String getRemotePath() const { return remote_path; }
|
||||
inline UInt64 getLastModificationTimestamp() const { return last_modification_timestamp; }
|
||||
|
||||
// deserialize
|
||||
virtual bool fromString(const String &buf) = 0;
|
||||
// serialize
|
||||
virtual String toString() = 0;
|
||||
// to compare two meta datas for detecting file changes
|
||||
virtual bool equal(std::shared_ptr<RemoteFileMetaDataBase> b) = 0;
|
||||
virtual String toString() const = 0;
|
||||
|
||||
// used for comparing two file meta datas are the same or not.
|
||||
virtual String getVersion() const = 0;
|
||||
protected:
|
||||
String schema;
|
||||
String cluster;
|
||||
@ -51,11 +50,11 @@ protected:
|
||||
using RemoteFileMetaDataBasePtr = std::shared_ptr<RemoteFileMetaDataBase>;
|
||||
|
||||
/*
|
||||
* How to register a subclass into factory and use it ?
|
||||
* 1) define your own subclass derive from RemoteFileMetaDataBase. Notice! the getClassName() must be the same
|
||||
as your subclass name.
|
||||
* 2) in a .cpp file, call REGISTTER_REMOTE_FILE_META_DATA_CLASS(subclass).
|
||||
3) call RemoteFileMetaDataFactory::instance().create_class(subclass_name) where you want to make a new object
|
||||
* How to register a subclass into the factory and use it ?
|
||||
* 1) define your own subclass derive from RemoteFileMetaDataBase. Notice! the getClassName() must be the same
|
||||
* as your subclass name.
|
||||
* 2) in a .cpp file, call REGISTTER_REMOTE_FILE_META_DATA_CLASS(subclass),
|
||||
* 3) call RemoteFileMetaDataFactory::instance().create_class(subclass_name) where you want to make a new object
|
||||
*/
|
||||
|
||||
class RemoteFileMetaDataFactory : private boost::noncopyable
|
||||
|
@ -53,18 +53,19 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
|
||||
if (!cache_controller->loadInnerInformation(local_path_ / "info.txt")
|
||||
|| cache_controller->file_status != DOWNLOADED)
|
||||
{
|
||||
LOG_INFO(log, "recover cached file failed. local path:{}", local_path_.string());
|
||||
LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
cache_controller->file_meta_data_ptr = RemoteFileMetaDataFactory::instance().create_class(cache_controller->meta_data_class);
|
||||
if (!cache_controller->file_meta_data_ptr)
|
||||
{
|
||||
// do not load this invalid cached file and clear it
|
||||
// do not load this invalid cached file and clear it. the clear action is in
|
||||
// RemoteReadBufferCache::recoverCachedFilesMetaData(), because deleting directories during iteration will
|
||||
// cause unexpected behaviors
|
||||
LOG_ERROR(log, "Cannot create the meta data class : {}. The cached file is invalid and will be remove. path:{}",
|
||||
cache_controller->meta_data_class,
|
||||
local_path_.string());
|
||||
fs::remove_all(local_path_);
|
||||
return nullptr;
|
||||
}
|
||||
std::ifstream meta_data_file(local_path_ / "meta_data.txt");
|
||||
@ -73,7 +74,6 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
|
||||
{
|
||||
LOG_ERROR(log, "Cannot load the meta data. The cached file is invalid and will be remove. path:{}",
|
||||
local_path_.string());
|
||||
fs::remove_all(local_path_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -93,6 +93,8 @@ RemoteCacheController::RemoteCacheController(
|
||||
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
||||
, current_offset(0)
|
||||
{
|
||||
// on recover, file_meta_data_ptr is null, but it will be allocated after loading from meta_data.txt
|
||||
// when we allocate a whole new file cache , file_meta_data_ptr must not be null.
|
||||
if (file_meta_data_ptr)
|
||||
{
|
||||
std::ofstream meta_data_file(local_path_ / "meta_data.txt", std::ios::out);
|
||||
@ -129,7 +131,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
|
||||
|
||||
bool RemoteCacheController::checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_)
|
||||
{
|
||||
return !file_meta_data_ptr->equal(file_meta_data_);
|
||||
return !(file_meta_data_ptr->getVersion() == file_meta_data_->getVersion());
|
||||
}
|
||||
|
||||
void RemoteCacheController::startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool)
|
||||
@ -180,7 +182,6 @@ void RemoteCacheController::flush(bool need_flush_status)
|
||||
{
|
||||
if (data_file_writer)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("RemoteCacheController"),"flush file. offset:{}, file:{}. total_size:{}", current_offset, local_path.string(), file_meta_data_ptr->getFileSize());
|
||||
data_file_writer->sync();
|
||||
}
|
||||
if (need_flush_status)
|
||||
@ -196,7 +197,11 @@ void RemoteCacheController::flush(bool need_flush_status)
|
||||
}
|
||||
}
|
||||
|
||||
RemoteCacheController::~RemoteCacheController() = default;
|
||||
RemoteCacheController::~RemoteCacheController()
|
||||
{
|
||||
if (download_task_holder)
|
||||
download_task_holder->deactivate();
|
||||
}
|
||||
|
||||
void RemoteCacheController::close()
|
||||
{
|
||||
@ -240,17 +245,17 @@ void RemoteCacheController::deallocFile(std::unique_ptr<ReadBufferFromFileBase>
|
||||
opened_file_buffer_refs.erase(it);
|
||||
}
|
||||
|
||||
// the size need be equal to the original buffer
|
||||
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBufferWithSize>(buff_size)
|
||||
{
|
||||
}
|
||||
|
||||
RemoteReadBuffer::~RemoteReadBuffer()
|
||||
{
|
||||
file_cache_controller->deallocFile(std::move(file_buffer));
|
||||
if (file_cache_controller)
|
||||
file_cache_controller->deallocFile(std::move(file_buffer));
|
||||
}
|
||||
|
||||
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer)
|
||||
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer)
|
||||
{
|
||||
auto * log = &Poco::Logger::get("RemoteReadBuffer");
|
||||
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
@ -269,15 +274,14 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, R
|
||||
|
||||
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
||||
auto * raw_readbuffer_ptr = read_buffer.release();
|
||||
std::shared_ptr<ReadBuffer> shared_readbuffer_ptr(raw_readbuffer_ptr);
|
||||
RemoteReadBufferCacheError error;
|
||||
|
||||
std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta_data, shared_readbuffer_ptr);
|
||||
std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta_data, read_buffer);
|
||||
if (remote_read_buffer->file_cache_controller == nullptr)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error);
|
||||
remote_read_buffer->original_read_buffer = shared_readbuffer_ptr;
|
||||
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}.", remote_path, error);
|
||||
// read_buffer is the input one.
|
||||
return read_buffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -286,51 +290,33 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, R
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}",
|
||||
remote_read_buffer->file_cache_controller->getLocalPath().string());
|
||||
}
|
||||
remote_read_buffer->remote_file_size = remote_file_meta_data->getFileSize();
|
||||
return remote_read_buffer;
|
||||
}
|
||||
|
||||
bool RemoteReadBuffer::nextImpl()
|
||||
{
|
||||
if (file_buffer)
|
||||
{
|
||||
auto start_offset = file_buffer->getPosition();
|
||||
auto end_offset = start_offset + file_buffer->internalBuffer().size();
|
||||
LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "nextImpl. start:{}, end:{}, file:{}, total_size:{}, remote_path:{}",
|
||||
start_offset, end_offset, file_buffer->getFileName(), file_cache_controller->getFileMetaData()->getFileSize(),
|
||||
file_cache_controller->getFileMetaData()->getRemotePath());
|
||||
file_cache_controller->waitMoreData(start_offset, end_offset);
|
||||
auto start_offset = file_buffer->getPosition();
|
||||
auto end_offset = start_offset + file_buffer->internalBuffer().size();
|
||||
file_cache_controller->waitMoreData(start_offset, end_offset);
|
||||
|
||||
auto status = file_buffer->next();
|
||||
if (status)
|
||||
BufferBase::set(file_buffer->buffer().begin(),
|
||||
file_buffer->buffer().size(),
|
||||
file_buffer->offset());
|
||||
return status;
|
||||
}
|
||||
else
|
||||
{
|
||||
// In the case we cannot use local cache, read from the original readbuffer directly
|
||||
if (!original_read_buffer)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Original read buffer is not initialized. It's a bug");
|
||||
|
||||
auto status = original_read_buffer->next();
|
||||
// We don't need to worry about the memory buffer allocated in RemoteReadBuffer, since it is owned by
|
||||
// BufferWithOwnMemory, BufferWithOwnMemory would release it.
|
||||
if (status)
|
||||
BufferBase::set(original_read_buffer->buffer().begin(), original_read_buffer->buffer().size(), original_read_buffer->offset());
|
||||
return status;
|
||||
}
|
||||
auto status = file_buffer->next();
|
||||
if (status)
|
||||
BufferBase::set(file_buffer->buffer().begin(),
|
||||
file_buffer->buffer().size(),
|
||||
file_buffer->offset());
|
||||
return status;
|
||||
}
|
||||
|
||||
off_t RemoteReadBuffer::seek(off_t offset, int whence)
|
||||
{
|
||||
if (!file_buffer)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot call seek() in this buffer. It's a bug!");
|
||||
/*
|
||||
* Need to wait here. For example, the current file has been download at position X, but here we try to seek to
|
||||
* postition Y (Y > X), it would fail.
|
||||
*/
|
||||
file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size());
|
||||
LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "seek. offset:{}. file:{}, total_size:{}", offset, file_buffer->getFileName(),
|
||||
file_cache_controller->getFileMetaData()->getFileSize());
|
||||
auto ret = file_buffer->seek(offset, whence);
|
||||
BufferBase::set(file_buffer->buffer().begin(),
|
||||
file_buffer->buffer().size(),
|
||||
@ -360,15 +346,24 @@ void RemoteReadBufferCache::recoverCachedFilesMetaData(
|
||||
{
|
||||
if (current_depth >= max_depth)
|
||||
{
|
||||
std::vector<fs::path> invalid_pathes;
|
||||
for (auto const & dir : fs::directory_iterator{current_path})
|
||||
{
|
||||
String path = dir.path();
|
||||
auto cache_controller = RemoteCacheController::recover(path);
|
||||
if (!cache_controller)
|
||||
{
|
||||
invalid_pathes.emplace_back(path);
|
||||
continue;
|
||||
auto & cell = caches[path];
|
||||
cell.cache_controller = cache_controller;
|
||||
cell.key_iterator = keys.insert(keys.end(), path);
|
||||
}
|
||||
if (!lru_caches->set(path, cache_controller))
|
||||
{
|
||||
invalid_pathes.emplace_back(path);
|
||||
}
|
||||
}
|
||||
for (auto & path : invalid_pathes)
|
||||
{
|
||||
fs::remove_all(path);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -384,7 +379,7 @@ void RemoteReadBufferCache::recoverTask()
|
||||
std::lock_guard lock(mutex);
|
||||
recoverCachedFilesMetaData(root_dir, 1, 2);
|
||||
initialized = true;
|
||||
LOG_TRACE(log, "Recovered from directory:{}", root_dir);
|
||||
LOG_INFO(log, "Recovered from directory:{}", root_dir);
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::initOnce(
|
||||
@ -394,8 +389,8 @@ void RemoteReadBufferCache::initOnce(
|
||||
LOG_INFO(
|
||||
log, "Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}", root_dir_, limit_size_);
|
||||
root_dir = root_dir_;
|
||||
limit_size = limit_size_;
|
||||
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
|
||||
lru_caches = std::make_unique<CacheType>(limit_size_);
|
||||
|
||||
/// create if root_dir not exists
|
||||
if (!fs::exists(fs::path(root_dir) / ""))
|
||||
@ -413,130 +408,62 @@ void RemoteReadBufferCache::initOnce(
|
||||
|
||||
String RemoteReadBufferCache::calculateLocalPath(RemoteFileMetaDataBasePtr meta_data) const
|
||||
{
|
||||
String full_path = meta_data->getSchema() + ":" + meta_data->getCluster() + ":" + meta_data->getRemotePath();
|
||||
// add version into the full_path, and not block to read the new version
|
||||
String full_path = meta_data->getSchema() + ":" + meta_data->getCluster() + ":" + meta_data->getRemotePath()
|
||||
+ ":" + meta_data->getVersion();
|
||||
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
||||
String hashcode_str = getHexUIntLowercase(hashcode);
|
||||
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
||||
}
|
||||
|
||||
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
||||
RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr<ReadBuffer> & read_buffer)
|
||||
std::tuple<RemoteCacheControllerPtr, std::unique_ptr<ReadBuffer>, RemoteReadBufferCacheError>
|
||||
RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> & read_buffer)
|
||||
{
|
||||
LOG_TRACE(log, "createReader. {}", remote_file_meta_data->toString());
|
||||
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
||||
if (!isInitialized())
|
||||
{
|
||||
LOG_ERROR(log, "RemoteReadBufferCache has not been initialized");
|
||||
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
|
||||
return {nullptr, std::move(read_buffer), RemoteReadBufferCacheError::NOT_INIT};
|
||||
}
|
||||
|
||||
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||
const auto & last_modification_timestamp = remote_file_meta_data->getLastModificationTimestamp();
|
||||
auto local_path = calculateLocalPath(remote_file_meta_data);
|
||||
std::lock_guard lock(mutex);
|
||||
auto cache_iter = caches.find(local_path);
|
||||
if (cache_iter != caches.end())
|
||||
auto cache = lru_caches->get(local_path);
|
||||
if (cache)
|
||||
{
|
||||
// if the file has been update on remote side, we need to redownload it
|
||||
if (cache_iter->second.cache_controller->checkFileChanged(remote_file_meta_data))
|
||||
// the remote file has been updated, need to redownload
|
||||
if (!cache->isValid() || cache->checkFileChanged(remote_file_meta_data))
|
||||
{
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Remote file ({}) has been updated. Last saved modification time: {}, actual last modification time: {}",
|
||||
remote_path,
|
||||
std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()),
|
||||
std::to_string(cache->getLastModificationTimestamp()),
|
||||
std::to_string(last_modification_timestamp));
|
||||
cache_iter->second.cache_controller->markInvalid();
|
||||
cache->markInvalid();
|
||||
}
|
||||
else
|
||||
{
|
||||
// move the key to the list end
|
||||
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
|
||||
return { cache_iter->second.cache_controller, RemoteReadBufferCacheError::OK};
|
||||
return {cache, nullptr, RemoteReadBufferCacheError::OK};
|
||||
}
|
||||
}
|
||||
|
||||
auto clear_ret = clearLocalCache();
|
||||
cache_iter = caches.find(local_path);
|
||||
if (cache_iter != caches.end())
|
||||
if (!fs::exists(local_path))
|
||||
fs::create_directories(local_path);
|
||||
|
||||
// cache is not found or is invalid
|
||||
auto new_cache = std::make_shared<RemoteCacheController>(remote_file_meta_data, local_path, local_cache_bytes_read_before_flush);
|
||||
if (!lru_caches->set(local_path, new_cache))
|
||||
{
|
||||
if (cache_iter->second.cache_controller->isValid())
|
||||
{
|
||||
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
|
||||
return {
|
||||
cache_iter->second.cache_controller,
|
||||
RemoteReadBufferCacheError::OK};
|
||||
}
|
||||
else
|
||||
{
|
||||
// maybe someone is holding this file
|
||||
LOG_INFO(log, "The remote file {} has been updated, but the previous readers do not finish reading.",
|
||||
remote_path);
|
||||
return {nullptr, RemoteReadBufferCacheError::FILE_INVALID};
|
||||
}
|
||||
LOG_ERROR(log, "Insert the new cache failed. new file size:{}, current total size:{}",
|
||||
remote_file_meta_data->getFileSize(),
|
||||
lru_caches->weight());
|
||||
return {nullptr, std::move(read_buffer), RemoteReadBufferCacheError::DISK_FULL};
|
||||
}
|
||||
|
||||
// reach the disk capacity limit
|
||||
if (!clear_ret)
|
||||
{
|
||||
LOG_INFO(log, "Reached local cache capacity limit size ({})", limit_size);
|
||||
return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
|
||||
}
|
||||
|
||||
fs::create_directories(local_path);
|
||||
|
||||
auto cache_controller
|
||||
= std::make_shared<RemoteCacheController>(remote_file_meta_data, local_path, local_cache_bytes_read_before_flush);
|
||||
cache_controller->startBackgroundDownload(read_buffer, context->getSchedulePool());
|
||||
CacheCell cache_cell;
|
||||
cache_cell.cache_controller = cache_controller;
|
||||
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
|
||||
caches[local_path] = cache_cell;
|
||||
|
||||
return {cache_controller, RemoteReadBufferCacheError::OK};
|
||||
}
|
||||
|
||||
bool RemoteReadBufferCache::clearLocalCache()
|
||||
{
|
||||
// clear closable cache from the list head
|
||||
for (auto it = keys.begin(); it != keys.end();)
|
||||
{
|
||||
auto cache_it = caches.find(*it);
|
||||
if (cache_it == caches.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found no entry in local cache with key: {}", *it);
|
||||
|
||||
auto cache_controller = cache_it->second.cache_controller;
|
||||
if (!cache_controller->isValid() && cache_controller->closable())
|
||||
{
|
||||
LOG_TRACE(log, "Clear invalid cache entry with key {} from local cache", *it);
|
||||
total_size
|
||||
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
|
||||
cache_controller->close();
|
||||
it = keys.erase(it);
|
||||
caches.erase(cache_it);
|
||||
continue;
|
||||
}
|
||||
|
||||
// if enough disk space is release, just to iterate the remained caches and clear the invalid ones.
|
||||
if (total_size > limit_size && cache_controller->closable())
|
||||
{
|
||||
total_size = total_size > cache_controller->size() ? total_size - cache_controller->size() : 0;
|
||||
cache_controller->close();
|
||||
caches.erase(cache_it);
|
||||
it = keys.erase(it);
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"clear local file {} for {}. key size:{}. next{}",
|
||||
cache_controller->getLocalPath().string(),
|
||||
cache_controller->getRemotePath(),
|
||||
keys.size(),
|
||||
*it);
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
LOG_TRACE(log, "After clear local cache, keys size:{}, total_size:{}, limit size:{}", keys.size(), total_size, limit_size);
|
||||
return total_size < limit_size;
|
||||
new_cache->startBackgroundDownload(std::move(read_buffer), context->getSchedulePool());
|
||||
return {new_cache, nullptr, RemoteReadBufferCacheError::OK};
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <filesystem>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Common/UnreleasableLRUCache.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
@ -96,6 +97,7 @@ public:
|
||||
return valid;
|
||||
}
|
||||
RemoteFileMetaDataBasePtr getFileMetaData() { return file_meta_data_ptr; }
|
||||
inline size_t getFileSize() const { return file_meta_data_ptr->getFileSize(); }
|
||||
|
||||
void startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool);
|
||||
|
||||
@ -137,25 +139,47 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
|
||||
public:
|
||||
explicit RemoteReadBuffer(size_t buff_size);
|
||||
~RemoteReadBuffer() override;
|
||||
static std::unique_ptr<RemoteReadBuffer> create(ContextPtr contex, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer);
|
||||
static std::unique_ptr<ReadBuffer> create(ContextPtr contex, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer);
|
||||
|
||||
bool nextImpl() override;
|
||||
inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData()->getFileSize() > 0; }
|
||||
inline bool seekable() { return file_buffer && remote_file_size > 0; }
|
||||
off_t seek(off_t off, int whence) override;
|
||||
off_t getPosition() override;
|
||||
std::optional<size_t> getTotalSize() override { return file_cache_controller->getFileMetaData()->getFileSize(); }
|
||||
std::optional<size_t> getTotalSize() override { return remote_file_size; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<RemoteCacheController> file_cache_controller;
|
||||
std::unique_ptr<ReadBufferFromFileBase> file_buffer;
|
||||
size_t remote_file_size = 0;
|
||||
};
|
||||
|
||||
// in case local cache don't work, this buffer is setted;
|
||||
std::shared_ptr<ReadBuffer> original_read_buffer;
|
||||
struct RemoteFileCacheWeightFunction
|
||||
{
|
||||
size_t operator()(const RemoteCacheController & cache) const
|
||||
{
|
||||
return cache.getFileSize();
|
||||
}
|
||||
};
|
||||
|
||||
struct RemoteFileCacheEvictPolicy
|
||||
{
|
||||
CacheEvictStatus canRelease(RemoteCacheController & cache) const
|
||||
{
|
||||
if (cache.closable())
|
||||
return CacheEvictStatus::CAN_EVITCT;
|
||||
return CacheEvictStatus::SKIP_EVICT;
|
||||
}
|
||||
void release(RemoteCacheController & cache)
|
||||
{
|
||||
cache.close();
|
||||
}
|
||||
};
|
||||
|
||||
class RemoteReadBufferCache
|
||||
{
|
||||
public:
|
||||
using CacheType = UnreleasableLRUCache<String, RemoteCacheController, std::hash<String>,
|
||||
RemoteFileCacheWeightFunction, RemoteFileCacheEvictPolicy>;
|
||||
~RemoteReadBufferCache();
|
||||
// global instance
|
||||
static RemoteReadBufferCache & instance();
|
||||
@ -164,8 +188,8 @@ public:
|
||||
|
||||
inline bool isInitialized() const { return initialized; }
|
||||
|
||||
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
||||
createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr<ReadBuffer> & read_buffer);
|
||||
std::tuple<RemoteCacheControllerPtr, std::unique_ptr<ReadBuffer>, RemoteReadBufferCacheError>
|
||||
createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> & read_buffer);
|
||||
|
||||
void updateTotalSize(size_t size) { total_size += size; }
|
||||
|
||||
@ -175,23 +199,15 @@ protected:
|
||||
private:
|
||||
// root directory of local cache for remote filesystem
|
||||
String root_dir;
|
||||
size_t limit_size = 0;
|
||||
size_t local_cache_bytes_read_before_flush = 0;
|
||||
|
||||
std::atomic<bool> initialized = false;
|
||||
std::atomic<size_t> total_size;
|
||||
std::mutex mutex;
|
||||
std::unique_ptr<CacheType> lru_caches;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("RemoteReadBufferCache");
|
||||
|
||||
struct CacheCell
|
||||
{
|
||||
std::list<String>::iterator key_iterator;
|
||||
std::shared_ptr<RemoteCacheController> cache_controller;
|
||||
};
|
||||
std::list<String> keys;
|
||||
std::map<String, CacheCell> caches;
|
||||
|
||||
String calculateLocalPath(RemoteFileMetaDataBasePtr meta) const;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder recover_task_holder;
|
||||
@ -200,7 +216,6 @@ private:
|
||||
const std::filesystem::path & current_path,
|
||||
size_t current_depth,
|
||||
size_t max_depth);
|
||||
bool clearLocalCache();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -5,14 +5,9 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
HiveFileMetaData::~HiveFileMetaData() = default;
|
||||
|
||||
String HiveFileMetaData::toString()
|
||||
String HiveFileMetaData::toString() const
|
||||
{
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("schema", schema);
|
||||
@ -40,12 +35,9 @@ bool HiveFileMetaData::fromString(const String &buf)
|
||||
return true;
|
||||
}
|
||||
|
||||
bool HiveFileMetaData::equal(RemoteFileMetaDataBasePtr meta_data)
|
||||
String HiveFileMetaData::getVersion() const
|
||||
{
|
||||
auto real_meta_data = std::dynamic_pointer_cast<HiveFileMetaData>(meta_data);
|
||||
if (!real_meta_data)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid meta data class");
|
||||
return last_modification_timestamp == real_meta_data->last_modification_timestamp;
|
||||
return std::to_string(getLastModificationTimestamp());
|
||||
}
|
||||
|
||||
REGISTTER_REMOTE_FILE_META_DATA_CLASS(HiveFileMetaData)
|
||||
|
@ -14,16 +14,10 @@ public:
|
||||
RemoteFileMetaDataBase(schema_, cluster_, remote_path_, file_size_, last_modification_timestamp_){}
|
||||
~HiveFileMetaData() override;
|
||||
|
||||
String getClassName() override { return "HiveFileMetaData"; }
|
||||
String getClassName() const override { return "HiveFileMetaData"; }
|
||||
|
||||
RemoteFileMetaDataBasePtr clone() override
|
||||
{
|
||||
auto result = std::make_shared<HiveFileMetaData>(schema, cluster, remote_path, file_size, last_modification_timestamp);
|
||||
return result;
|
||||
}
|
||||
String toString() override;
|
||||
String toString() const override;
|
||||
bool fromString(const String &buf) override;
|
||||
bool equal(RemoteFileMetaDataBasePtr meta_data) override;
|
||||
|
||||
String getVersion() const override;
|
||||
};
|
||||
}
|
||||
|
@ -169,10 +169,9 @@ public:
|
||||
|
||||
// Use local cache for remote filesystem if enabled.
|
||||
std::unique_ptr<ReadBuffer> remote_read_buf;
|
||||
bool x = false;
|
||||
if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs && x)
|
||||
if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs)
|
||||
remote_read_buf = RemoteReadBuffer::create(getContext(),
|
||||
std::make_shared<HiveFileMetaData>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getLastModTs(), curr_file->getSize()),
|
||||
std::make_shared<HiveFileMetaData>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
|
||||
std::move(raw_read_buf));
|
||||
else
|
||||
remote_read_buf = std::move(raw_read_buf);
|
||||
|
Loading…
Reference in New Issue
Block a user