ClickHouse/src/IO/RemoteReadBufferCache.cpp

590 lines
20 KiB
C++
Raw Normal View History

2021-11-29 03:30:11 +00:00
#include <fstream>
#include <memory>
#include <unistd.h>
2021-11-05 11:55:30 +00:00
#include <functional>
#include <Poco/Logger.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Parser.h>
#include <base/logger_useful.h>
2021-11-29 03:30:11 +00:00
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <Common/Exception.h>
#include <IO/RemoteReadBufferCache.h>
#include <IO/WriteHelpers.h>
2021-11-05 11:55:30 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_GET;
extern const int LOGICAL_ERROR;
}
2021-11-29 03:30:11 +00:00
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
const std::filesystem::path & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
2021-11-05 11:55:30 +00:00
{
std::filesystem::path data_file = local_path_ / "data.bin";
std::filesystem::path meta_file = local_path_ / "meta.txt";
2021-11-29 04:01:52 +00:00
auto * log = &Poco::Logger::get("RemoteCacheController");
if (!std::filesystem::exists(data_file) || !std::filesystem::exists(meta_file))
2021-11-05 11:55:30 +00:00
{
2021-11-29 04:01:52 +00:00
LOG_ERROR(log, "Directory {} or file {}, {} does not exist", local_path_.string(), data_file.string(), meta_file.string());
2021-11-05 11:55:30 +00:00
return nullptr;
}
std::ifstream meta_fs(meta_file);
2021-11-05 11:55:30 +00:00
Poco::JSON::Parser meta_parser;
auto meta_jobj = meta_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
auto remote_path = meta_jobj->get("remote_path").convert<std::string>();
auto schema = meta_jobj->get("schema").convert<std::string>();
auto cluster = meta_jobj->get("cluster").convert<std::string>();
auto downloaded = meta_jobj->get("downloaded").convert<std::string>();
auto modification_ts = meta_jobj->get("last_modification_timestamp").convert<UInt64>();
2021-11-05 11:55:30 +00:00
if (downloaded == "false")
{
2021-11-29 04:17:22 +00:00
LOG_ERROR(log, "not a downloaded file: " + local_path_.string());
2021-11-05 11:55:30 +00:00
return nullptr;
}
auto file_size = std::filesystem::file_size(data_file);
2021-11-05 11:55:30 +00:00
2021-11-29 03:30:11 +00:00
RemoteFileMeta remote_file_meta(schema, cluster, remote_path, modification_ts, file_size);
auto cntrl = std::make_shared<RemoteCacheController>(remote_file_meta, local_path_, 0, nullptr, finish_callback);
2021-11-05 11:55:30 +00:00
cntrl->download_finished = true;
cntrl->current_offset = file_size;
2021-11-05 11:55:30 +00:00
meta_fs.close();
finish_callback(cntrl.get());
return cntrl;
}
RemoteCacheController::RemoteCacheController(
const RemoteFileMeta & remote_file_meta,
const std::filesystem::path & local_path_,
2021-11-11 11:50:08 +00:00
size_t cache_bytes_before_flush_,
2021-11-05 11:55:30 +00:00
std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback)
{
schema = remote_file_meta.schema;
cluster = remote_file_meta.cluster;
2021-11-05 11:55:30 +00:00
local_path = local_path_;
remote_path = remote_file_meta.path;
last_modification_timestamp = remote_file_meta.last_modification_timestamp;
2021-11-11 11:50:08 +00:00
local_cache_bytes_read_before_flush = cache_bytes_before_flush_;
2021-11-05 11:55:30 +00:00
valid = true;
if (readbuffer_ != nullptr)
{
download_finished = false;
current_offset = 0;
remote_readbuffer = readbuffer_;
// setup local files
out_file = std::make_unique<std::ofstream>(local_path_ / "data.bin", std::ios::out | std::ios::binary);
2021-11-05 11:55:30 +00:00
out_file->flush();
Poco::JSON::Object jobj;
jobj.set("schema", schema);
jobj.set("cluster", cluster);
jobj.set("remote_path", remote_path);
2021-11-05 11:55:30 +00:00
jobj.set("downloaded", "false");
jobj.set("last_modification_timestamp", last_modification_timestamp);
2021-11-29 03:30:11 +00:00
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2021-11-05 11:55:30 +00:00
jobj.stringify(buf);
std::ofstream meta_file(local_path_ / "meta.txt", std::ios::out);
2021-11-05 11:55:30 +00:00
meta_file.write(buf.str().c_str(), buf.str().size());
meta_file.close();
backgroupDownload(finish_callback);
}
}
RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
2021-11-05 11:55:30 +00:00
{
std::unique_lock lock{mutex};
if (download_finished)
{
// finish reading
if (start_offset_ >= current_offset)
{
lock.unlock();
return RemoteReadBufferCacheError::END_OF_FILE;
2021-11-05 11:55:30 +00:00
}
}
else // block until more data is ready
{
if (current_offset >= end_offset_)
{
lock.unlock();
return RemoteReadBufferCacheError::OK;
2021-11-05 11:55:30 +00:00
}
else
2021-11-12 05:56:06 +00:00
more_data_signal.wait(lock, [this, end_offset_] { return this->download_finished || this->current_offset >= end_offset_; });
2021-11-05 11:55:30 +00:00
}
lock.unlock();
return RemoteReadBufferCacheError::OK;
2021-11-05 11:55:30 +00:00
}
void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
{
2021-11-29 03:30:11 +00:00
auto task = [this, finish_callback]() {
2021-11-11 11:50:08 +00:00
size_t unflush_bytes = 0;
2021-11-05 11:55:30 +00:00
size_t total_bytes = 0;
while (!remote_readbuffer->eof())
{
2021-11-11 11:50:08 +00:00
size_t bytes = remote_readbuffer->available();
2021-11-05 11:55:30 +00:00
out_file->write(remote_readbuffer->position(), bytes);
remote_readbuffer->position() += bytes;
total_bytes += bytes;
2021-11-11 11:50:08 +00:00
unflush_bytes += bytes;
if (unflush_bytes >= local_cache_bytes_read_before_flush)
2021-11-05 11:55:30 +00:00
{
std::unique_lock lock(mutex);
current_offset += total_bytes;
total_bytes = 0;
flush();
lock.unlock();
more_data_signal.notify_all();
2021-11-11 11:50:08 +00:00
unflush_bytes = 0;
2021-11-05 11:55:30 +00:00
}
}
std::unique_lock lock(mutex);
current_offset += total_bytes;
download_finished = true;
2021-11-11 11:50:08 +00:00
flush(true);
2021-11-05 11:55:30 +00:00
out_file->close();
out_file = nullptr;
remote_readbuffer = nullptr;
lock.unlock();
more_data_signal.notify_all();
finish_callback(this);
2021-11-29 03:47:31 +00:00
LOG_TRACE(log, "finish download.{} into {}. size:{} ", remote_path, local_path.string(), current_offset);
2021-11-05 11:55:30 +00:00
};
2021-11-29 03:47:31 +00:00
RemoteReadBufferCache::instance().getThreadPool()->scheduleOrThrow(task);
2021-11-05 11:55:30 +00:00
}
2021-11-11 11:50:08 +00:00
void RemoteCacheController::flush(bool need_flush_meta_)
2021-11-05 11:55:30 +00:00
{
if (out_file)
2021-11-05 11:55:30 +00:00
{
out_file->flush();
}
2021-11-11 11:50:08 +00:00
if (!need_flush_meta_)
return;
2021-11-05 11:55:30 +00:00
Poco::JSON::Object jobj;
jobj.set("schema", schema);
jobj.set("cluster", cluster);
jobj.set("remote_path", remote_path);
jobj.set("downloaded", download_finished ? "true" : "false");
2021-11-11 11:50:08 +00:00
jobj.set("last_modification_timestamp", last_modification_timestamp);
2021-11-29 03:30:11 +00:00
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2021-11-05 11:55:30 +00:00
jobj.stringify(buf);
std::ofstream meta_file(local_path / "meta.txt", std::ios::out);
2021-11-05 11:55:30 +00:00
meta_file << buf.str();
meta_file.close();
}
RemoteCacheController::~RemoteCacheController() = default;
2021-11-05 11:55:30 +00:00
void RemoteCacheController::close()
{
// delete the directory
2021-11-29 04:17:22 +00:00
LOG_TRACE(log, "release local resource: " + remote_path + ", " + local_path.string());
std::filesystem::remove_all(local_path);
2021-11-05 11:55:30 +00:00
}
2021-11-29 04:17:22 +00:00
std::pair<FILE *, std::filesystem::path> RemoteCacheController::allocFile()
2021-11-05 11:55:30 +00:00
{
std::filesystem::path result_local_path;
2021-11-05 11:55:30 +00:00
if (download_finished)
2021-11-29 03:30:11 +00:00
result_local_path = local_path / "data.bin";
FILE * fs = fopen((local_path / "data.bin").string().c_str(), "r");
2021-11-05 11:55:30 +00:00
if (fs == nullptr)
return {fs, result_local_path};
2021-11-05 11:55:30 +00:00
std::lock_guard lock{mutex};
opened_file_streams.insert(fs);
return {fs, result_local_path};
2021-11-05 11:55:30 +00:00
}
void RemoteCacheController::deallocFile(FILE * fs)
{
{
std::lock_guard lock{mutex};
auto it = opened_file_streams.find(fs);
if (it == opened_file_streams.end())
{
std::string err = "try to close an invalid file " + remote_path;
throw Exception(err, ErrorCodes::BAD_ARGUMENTS);
}
opened_file_streams.erase(it);
}
fclose(fs);
}
LocalCachedFileReader::LocalCachedFileReader(RemoteCacheController * cntrl_, size_t size_)
: offset(0), file_size(size_), fs(nullptr), controller(cntrl_)
{
std::tie(fs, local_path) = controller->allocFile();
2021-11-05 11:55:30 +00:00
if (fs == nullptr)
throw Exception("alloc file failed.", ErrorCodes::BAD_GET);
}
LocalCachedFileReader::~LocalCachedFileReader()
{
controller->deallocFile(fs);
}
size_t LocalCachedFileReader::read(char * buf, size_t size)
{
auto wret = controller->waitMoreData(offset, offset + size);
if (wret != RemoteReadBufferCacheError::OK)
2021-11-05 11:55:30 +00:00
return 0;
std::lock_guard lock(mutex);
auto ret_size = fread(buf, 1, size, fs);
offset += ret_size;
return ret_size;
}
off_t LocalCachedFileReader::seek(off_t off)
{
controller->waitMoreData(off, 1);
std::lock_guard lock(mutex);
auto ret = fseek(fs, off, SEEK_SET);
offset = off;
if (ret != 0)
{
return -1;
}
return off;
}
size_t LocalCachedFileReader::size()
{
if (file_size != 0)
return file_size;
if (local_path.empty())
{
2021-11-29 04:17:22 +00:00
LOG_TRACE(log, "empty local_path");
2021-11-05 11:55:30 +00:00
return 0;
}
auto ret = std::filesystem::file_size(local_path);
2021-11-05 11:55:30 +00:00
file_size = ret;
return ret;
}
// the size need be equal to the original buffer
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBuffer>(buff_size)
{
}
RemoteReadBuffer::~RemoteReadBuffer() = default;
2021-11-29 03:30:11 +00:00
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMeta & remote_file_meta_, std::unique_ptr<ReadBuffer> readbuffer)
2021-11-05 11:55:30 +00:00
{
2021-11-29 04:17:22 +00:00
auto * log = &Poco::Logger::get("RemoteReadBuffer");
2021-11-05 11:55:30 +00:00
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
if (readbuffer != nullptr)
buff_size = readbuffer->internalBuffer().size();
/*
* in the new implement of ReadBufferFromHDFS, buffer size is 0.
*
* in the common case, we don't read bytes from readbuffer directly, so set buff_size = DBMS_DEFAULT_BUFFER_SIZE
* is OK.
*
* we need be careful with the case without local file reader.
*/
if (buff_size == 0)
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
2021-11-12 08:43:35 +00:00
const auto & remote_path = remote_file_meta_.path;
2021-11-05 11:55:30 +00:00
auto rrb = std::make_unique<RemoteReadBuffer>(buff_size);
auto * raw_rbp = readbuffer.release();
std::shared_ptr<ReadBuffer> srb(raw_rbp);
RemoteReadBufferCacheError error;
2021-11-05 11:55:30 +00:00
int retry = 0;
do
{
if (retry > 0)
usleep(20 * retry);
2021-11-29 03:30:11 +00:00
std::tie(rrb->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb);
2021-11-05 11:55:30 +00:00
retry++;
} while (error == RemoteReadBufferCacheError::FILE_INVALID && retry < 10);
2021-11-05 11:55:30 +00:00
if (rrb->file_reader == nullptr)
{
2021-11-29 04:17:22 +00:00
LOG_ERROR(log, "allocate local file failed for " + remote_path + "{}", error);
2021-11-05 11:55:30 +00:00
rrb->original_readbuffer = srb;
}
return rrb;
}
bool RemoteReadBuffer::nextImpl()
{
if (file_reader != nullptr)
{
int bytes_read = file_reader->read(internal_buffer.begin(), internal_buffer.size());
if (bytes_read)
working_buffer.resize(bytes_read);
else
{
return false;
}
}
else // in the case we cannot use local cache, read from the original readbuffer directly
{
if (original_readbuffer == nullptr)
throw Exception("original readbuffer should not be null", ErrorCodes::LOGICAL_ERROR);
auto status = original_readbuffer->next();
// we don't need to worry about the memory buffer allocated in RemoteReadBuffer, since it is owned by
// BufferWithOwnMemory, BufferWithOwnMemory would release it.
if (status)
BufferBase::set(original_readbuffer->buffer().begin(), original_readbuffer->buffer().size(), original_readbuffer->offset());
return status;
}
return true;
}
off_t RemoteReadBuffer::seek(off_t offset, int whence)
{
off_t pos_in_file = file_reader->getOffset();
off_t new_pos;
if (whence == SEEK_SET)
new_pos = offset;
else if (whence == SEEK_CUR)
new_pos = pos_in_file - (working_buffer.end() - pos) + offset;
else
throw Exception("expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::BAD_ARGUMENTS);
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == pos_in_file)
return new_pos;
if (new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
{
/// Position is still inside buffer.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
return new_pos;
}
pos = working_buffer.end();
auto ret_off = file_reader->seek(new_pos);
if (ret_off == -1)
throw Exception(
"seek file failed. " + std::to_string(pos_in_file) + "->" + std::to_string(new_pos) + "@" + std::to_string(file_reader->size())
+ "," + std::to_string(whence) + "," + file_reader->getPath(),
ErrorCodes::BAD_ARGUMENTS);
return ret_off;
}
off_t RemoteReadBuffer::getPosition()
{
return file_reader->getOffset() - available();
2021-11-05 11:55:30 +00:00
}
RemoteReadBufferCache::RemoteReadBufferCache() = default;
RemoteReadBufferCache::~RemoteReadBufferCache()
{
2021-11-29 03:47:31 +00:00
thread_pool->wait();
}
2021-11-05 11:55:30 +00:00
RemoteReadBufferCache & RemoteReadBufferCache::instance()
{
static RemoteReadBufferCache instance;
return instance;
}
2021-11-15 02:09:21 +00:00
void RemoteReadBufferCache::recoverCachedFilesMeta(
2021-11-29 03:30:11 +00:00
const std::filesystem::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback)
{
if (current_depth >= max_depth)
{
for (auto const & dir : std::filesystem::directory_iterator{current_path})
{
std::string path = dir.path();
auto cache_controller = RemoteCacheController::recover(path, finish_callback);
if (!cache_controller)
continue;
2021-11-29 03:30:11 +00:00
auto & cell = caches[path];
cell.cache_controller = cache_controller;
cell.key_iterator = keys.insert(keys.end(), path);
}
return;
}
2021-11-29 03:30:11 +00:00
for (auto const & dir : std::filesystem::directory_iterator{current_path})
{
recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback);
}
}
2021-11-29 03:30:11 +00:00
void RemoteReadBufferCache::initOnce(
const std::filesystem::path & dir, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads)
2021-11-05 11:55:30 +00:00
{
2021-11-29 04:01:52 +00:00
LOG_INFO(
log,
"Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}",
dir.string(),
limit_size_);
2021-11-05 11:55:30 +00:00
local_path_prefix = dir;
limit_size = limit_size_;
2021-11-11 11:50:08 +00:00
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
2021-11-29 03:47:31 +00:00
thread_pool = std::make_shared<FreeThreadPool>(max_threads, 1000, 1000, false);
2021-11-05 11:55:30 +00:00
// scan local disk dir and recover the cache metas
std::filesystem::path root_dir(local_path_prefix);
if (!std::filesystem::exists(root_dir))
{
LOG_INFO(log, "{} not exists. this cache will be disable", local_path_prefix);
2021-11-05 11:55:30 +00:00
return;
}
2021-11-29 03:30:11 +00:00
auto recover_task = [this, root_dir]() {
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); };
std::lock_guard lock(this->mutex);
// two level dir. /<first 3 chars of path hash code>/<path hash code>
recoverCachedFilesMeta(root_dir, 1, 2, callback);
this->inited = true;
LOG_TRACE(this->log, "recovered from disk ");
};
2021-11-29 03:47:31 +00:00
getThreadPool()->scheduleOrThrow(recover_task);
2021-11-05 11:55:30 +00:00
}
std::filesystem::path RemoteReadBufferCache::calculateLocalPath(const RemoteFileMeta & meta)
2021-11-05 11:55:30 +00:00
{
2021-11-15 02:09:21 +00:00
std::string full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
std::string hashcode_str = getHexUIntLowercase(hashcode);
2021-11-29 03:30:11 +00:00
return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0, 3) / hashcode_str;
2021-11-05 11:55:30 +00:00
}
2021-11-29 03:30:11 +00:00
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
RemoteReadBufferCache::createReader(const RemoteFileMeta & remote_file_meta, std::shared_ptr<ReadBuffer> & readbuffer)
2021-11-05 11:55:30 +00:00
{
2021-11-15 11:08:58 +00:00
// If something is wrong on startup, rollback to read from the original ReadBuffer
2021-11-05 11:55:30 +00:00
if (!hasInitialized())
{
LOG_ERROR(log, "RemoteReadBufferCache has not initialized");
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
2021-11-05 11:55:30 +00:00
}
auto remote_path = remote_file_meta.path;
2021-11-12 08:43:35 +00:00
const auto & file_size = remote_file_meta.file_size;
const auto & last_modification_timestamp = remote_file_meta.last_modification_timestamp;
auto local_path = calculateLocalPath(remote_file_meta);
2021-11-05 11:55:30 +00:00
std::lock_guard lock(mutex);
auto cache_iter = caches.find(local_path);
if (cache_iter != caches.end())
2021-11-05 11:55:30 +00:00
{
// if the file has been update on remote side, we need to redownload it
if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp)
2021-11-05 11:55:30 +00:00
{
2021-11-29 03:30:11 +00:00
LOG_TRACE(
log,
"remote file has been updated. " + remote_path + ":"
+ std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->"
+ std::to_string(last_modification_timestamp));
cache_iter->second.cache_controller->markInvalid();
2021-11-05 11:55:30 +00:00
}
else
{
// move the key to the list end
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
2021-11-29 03:30:11 +00:00
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
RemoteReadBufferCacheError::OK};
2021-11-05 11:55:30 +00:00
}
}
auto clear_ret = clearLocalCache();
cache_iter = caches.find(local_path);
if (cache_iter != caches.end())
2021-11-05 11:55:30 +00:00
{
if (cache_iter->second.cache_controller->isValid())
2021-11-05 11:55:30 +00:00
{
2021-11-05 12:27:37 +00:00
// move the key to the list end, this case should not happen?
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
2021-11-29 03:30:11 +00:00
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
RemoteReadBufferCacheError::OK};
2021-11-05 11:55:30 +00:00
}
else
{
// maybe someone is holding this file
return {nullptr, RemoteReadBufferCacheError::FILE_INVALID};
2021-11-05 11:55:30 +00:00
}
}
// reach the disk capacity limit
if (!clear_ret)
{
LOG_ERROR(log, "local cache is full, return nullptr");
return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
2021-11-05 11:55:30 +00:00
}
2021-11-11 11:50:08 +00:00
std::filesystem::create_directories(local_path);
2021-11-05 11:55:30 +00:00
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); };
2021-11-29 03:30:11 +00:00
auto cache_cntrl
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, readbuffer, callback);
2021-11-05 11:55:30 +00:00
CacheCell cc;
cc.cache_controller = cache_cntrl;
cc.key_iterator = keys.insert(keys.end(), local_path);
caches[local_path] = cc;
return {std::make_shared<LocalCachedFileReader>(cache_cntrl.get(), file_size), RemoteReadBufferCacheError::OK};
2021-11-05 11:55:30 +00:00
}
bool RemoteReadBufferCache::clearLocalCache()
{
for (auto it = keys.begin(); it != keys.end();)
{
auto cache_it = caches.find(*it);
auto cntrl = cache_it->second.cache_controller;
2021-11-05 11:55:30 +00:00
if (!cntrl->isValid() && cntrl->closable())
{
LOG_TRACE(log, "clear invalid cache: " + *it);
2021-11-29 03:30:11 +00:00
total_size
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
2021-11-05 11:55:30 +00:00
cntrl->close();
it = keys.erase(it);
caches.erase(cache_it);
2021-11-05 11:55:30 +00:00
}
else
it++;
}
// clear closable cache from the list head
for (auto it = keys.begin(); it != keys.end();)
{
if (total_size < limit_size)
break;
auto cache_it = caches.find(*it);
if (cache_it == caches.end())
2021-11-05 11:55:30 +00:00
{
throw Exception("file not found in cache?" + *it, ErrorCodes::LOGICAL_ERROR);
}
if (cache_it->second.cache_controller->closable())
2021-11-05 11:55:30 +00:00
{
2021-11-29 03:30:11 +00:00
total_size
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
cache_it->second.cache_controller->close();
caches.erase(cache_it);
2021-11-05 11:55:30 +00:00
it = keys.erase(it);
2021-11-29 03:30:11 +00:00
LOG_TRACE(
log,
"clear local file {} for {}. key size:{}. next{}",
cache_it->second.cache_controller->getLocalPath().string(),
cache_it->second.cache_controller->getRemotePath(),
keys.size(),
*it);
2021-11-05 11:55:30 +00:00
}
else
break;
}
LOG_TRACE(log, "keys size:{}, total_size:{}, limit size:{}", keys.size(), total_size, limit_size);
return total_size < limit_size;
2021-11-05 11:55:30 +00:00
}
}