ClickHouse/src/Common/FileCache.cpp

626 lines
20 KiB
C++
Raw Normal View History

2022-01-13 11:57:56 +00:00
#include "FileCache.h"
#include <Common/randomSeed.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <IO/ReadHelpers.h>
2022-01-21 15:39:34 +00:00
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
2022-01-13 11:57:56 +00:00
#include <pcg-random/pcg_random.hpp>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
2022-01-21 15:39:34 +00:00
namespace ErrorCodes
{
extern const int FILE_CACHE_ERROR;
extern const int LOGICAL_ERROR;
2022-01-21 15:39:34 +00:00
}
2022-01-13 11:57:56 +00:00
namespace
{
2022-01-23 16:51:18 +00:00
String keyToStr(const FileCache::Key & key)
2022-01-13 11:57:56 +00:00
{
return getHexUIntLowercase(key);
}
}
FileCache::FileCache(
const String & cache_base_path_,
size_t max_size_,
size_t max_element_size_)
: cache_base_path(cache_base_path_)
, max_size(max_size_)
, max_element_size(max_element_size_)
2022-01-13 11:57:56 +00:00
{
}
2022-01-23 16:51:18 +00:00
FileCache::Key FileCache::hash(const String & path)
2022-01-13 11:57:56 +00:00
{
return sipHash128(path.data(), path.size());
}
String FileCache::path(const Key & key, size_t offset)
{
auto key_str = keyToStr(key);
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
}
String FileCache::path(const Key & key)
{
auto key_str = keyToStr(key);
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
}
2022-01-24 22:07:02 +00:00
bool FileCache::shouldBypassCache()
{
return !CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext()
|| !CurrentThread::getQueryId().size;
}
2022-01-13 11:57:56 +00:00
LRUFileCache::LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_)
: FileCache(cache_base_path_, max_size_, max_element_size_), log(&Poco::Logger::get("LRUFileCache"))
{
if (fs::exists(cache_base_path))
restore();
else
fs::create_directories(cache_base_path);
2022-01-21 15:39:34 +00:00
startup_restore_finished = true;
2022-01-13 11:57:56 +00:00
}
void LRUFileCache::useCell(
2022-01-24 22:07:02 +00:00
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & /* cache_lock */)
2022-01-13 11:57:56 +00:00
{
auto file_segment = cell.file_segment;
if (file_segment->download_state == FileSegment::State::DOWNLOADED
&& fs::file_size(path(file_segment->key(), file_segment->offset())) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. Current file segment: {}",
file_segment->range().toString());
2022-01-13 11:57:56 +00:00
result.push_back(cell.file_segment);
2022-01-21 15:39:34 +00:00
/**
* A cell receives a queue iterator on first successful space reservation attempt
* (space is reserved incrementally on each read buffer nextImpl() call).
*/
if (cell.queue_iterator)
{
/// Move to the end of the queue. The iterator remains valid.
queue.splice(queue.end(), queue, *cell.queue_iterator);
}
2022-01-13 11:57:56 +00:00
}
LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
2022-01-24 22:07:02 +00:00
const Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */)
2022-01-13 11:57:56 +00:00
{
auto it = files.find(key);
if (it == files.end())
return nullptr;
auto & offsets = it->second;
auto cell_it = offsets.find(offset);
if (cell_it == offsets.end())
return nullptr;
return &cell_it->second;
}
FileSegments LRUFileCache::getImpl(
2022-01-24 22:07:02 +00:00
const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock)
2022-01-13 11:57:56 +00:00
{
/// Given range = [left, right] and non-overlapping ordered set of file segments,
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
auto it = files.find(key);
if (it == files.end())
return {};
const auto & file_segments = it->second;
if (file_segments.empty())
{
2022-01-21 15:39:34 +00:00
removeFileKey(key);
2022-01-13 11:57:56 +00:00
return {};
}
FileSegments result;
auto segment_it = file_segments.lower_bound(range.left);
if (segment_it == file_segments.end())
{
/// N - last cached segment for given file key, segment{N}.offset < range.left:
/// segment{N} segment{N}
/// [________ [_______]
/// [__________] OR [________]
/// ^ ^
/// range.left range.left
const auto & cell = (--file_segments.end())->second;
if (cell.file_segment->range().right < range.left)
return {};
useCell(cell, result, cache_lock);
}
else /// segment_it <-- segmment{k}
{
if (segment_it != file_segments.begin())
{
const auto & prev_cell = std::prev(segment_it)->second;
const auto & prev_cell_range = prev_cell.file_segment->range();
if (range.left <= prev_cell_range.right)
{
/// segment{k-1} segment{k}
/// [________] [_____
/// [___________
/// ^
/// range.left
useCell(prev_cell, result, cache_lock);
}
}
/// segment{k} ... segment{k-1} segment{k} segment{k}
/// [______ [______] [____ [________
/// [_________ OR [________ OR [______] ^
/// ^ ^ ^ segment{k}.offset
/// range.left range.left range.right
while (segment_it != file_segments.end())
{
const auto & cell = segment_it->second;
if (range.right < cell.file_segment->range().left)
break;
2022-01-21 15:39:34 +00:00
2022-01-13 11:57:56 +00:00
useCell(cell, result, cache_lock);
++segment_it;
}
}
return result;
}
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
{
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
if (file_segments.empty())
{
2022-01-23 16:51:18 +00:00
auto * cell = addCell(key, offset, size, FileSegment::State::EMPTY, cache_lock);
2022-01-21 15:39:34 +00:00
file_segments = {cell->file_segment};
2022-01-13 11:57:56 +00:00
}
else
{
/// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
/// intersect with given range.
/// It can have holes:
/// [____________________] -- requested range
/// [____] [_] [_________] -- intersecting cache [segment1, ..., segmentN]
2022-01-21 15:39:34 +00:00
///
/// For each such hole create a cell with file segment state EMPTY.
2022-01-13 11:57:56 +00:00
auto it = file_segments.begin();
auto segment_range = (*it)->range();
size_t current_pos;
if (segment_range.left < range.left)
{
/// [_______ -- requested range
/// [_______
/// ^
/// segment1
current_pos = segment_range.right + 1;
++it;
}
else
current_pos = range.left;
while (current_pos <= range.right && it != file_segments.end())
{
segment_range = (*it)->range();
if (current_pos == segment_range.left)
{
current_pos = segment_range.right + 1;
++it;
continue;
}
assert(current_pos < segment_range.left);
auto hole_size = segment_range.left - current_pos;
2022-01-23 16:51:18 +00:00
auto * cell = addCell(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock);
2022-01-21 15:39:34 +00:00
file_segments.insert(it, cell->file_segment);
2022-01-13 11:57:56 +00:00
current_pos = segment_range.right + 1;
++it;
}
if (current_pos <= range.right)
{
/// ________] -- requested range
/// _____]
/// ^
/// segmentN
auto hole_size = range.right - current_pos + 1;
2022-01-23 16:51:18 +00:00
auto * cell = addCell(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock);
2022-01-21 15:39:34 +00:00
file_segments.push_back(cell->file_segment);
2022-01-13 11:57:56 +00:00
}
}
2022-01-24 22:07:02 +00:00
/// TODO: remove this extra debug logging.
String ranges;
for (const auto & s : file_segments)
ranges += "\nRange: " + s->range().toString() + ", download state: " + FileSegment::stateToString(s->download_state) + " ";
2022-01-24 22:07:02 +00:00
LOG_TEST(log, "Cache getOrSet. Key: {}, range: {}, file_segments number: {}, ranges: {}",
keyToStr(key), range.toString(), file_segments.size(), ranges);
2022-01-22 10:41:11 +00:00
assert(!file_segments.empty());
2022-01-13 11:57:56 +00:00
return FileSegmentsHolder(std::move(file_segments));
}
2022-01-21 15:39:34 +00:00
LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
const Key & key, size_t offset, size_t size, FileSegment::State state,
std::lock_guard<std::mutex> & /* cache_lock */)
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
/// Create a file segment cell and put it in `files` map by [key][offset].
2022-01-13 11:57:56 +00:00
if (!size)
return nullptr; /// Empty files are not cached.
if (files[key].contains(offset))
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
2022-01-13 11:57:56 +00:00
"Cache already exists for key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size);
2022-01-21 15:39:34 +00:00
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state);
2022-01-22 22:56:24 +00:00
FileSegmentCell cell(std::move(file_segment), queue);
2022-01-13 11:57:56 +00:00
auto & offsets = files[key];
if (offsets.empty())
{
auto key_path = path(key);
if (!fs::exists(key_path))
fs::create_directories(key_path);
}
auto [it, inserted] = offsets.insert({offset, std::move(cell)});
if (!inserted)
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
2022-01-13 11:57:56 +00:00
"Failed to insert into cache key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size);
return &(it->second);
}
2022-01-24 22:07:02 +00:00
bool LRUFileCache::tryReserve(const Key & key_, size_t offset_, size_t size, std::lock_guard<std::mutex> & cache_lock)
2022-01-21 15:39:34 +00:00
{
2022-01-23 16:51:18 +00:00
auto removed_size = 0;
size_t queue_size = queue.size();
assert(queue_size <= max_element_size);
2022-01-22 22:56:24 +00:00
/// Since space reservation is incremental, cache cell already exists if it's state is EMPTY.
2022-01-23 16:51:18 +00:00
/// And it cache cell does not exist on startup -- as we first check for space and then add a cell.
auto * cell_for_reserve = getCell(key_, offset_, cache_lock);
2022-01-22 22:56:24 +00:00
2022-01-23 16:51:18 +00:00
/// A cell acquires a LRUQueue iterator on first successful space reservation attempt.
if (!cell_for_reserve || !cell_for_reserve->queue_iterator)
queue_size += 1;
2022-01-13 11:57:56 +00:00
auto is_overflow = [&]
{
return (current_size + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size);
};
2022-01-21 15:39:34 +00:00
std::vector<FileKeyAndOffset> to_evict;
2022-01-13 11:57:56 +00:00
auto key_it = queue.begin();
while (is_overflow() && key_it != queue.end())
{
const auto [key, offset] = *key_it++;
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
2022-01-13 11:57:56 +00:00
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset);
size_t cell_size = cell->size();
/// It is guaranteed that cell is not removed from cache as long as
/// pointer to corresponding file segment is hold by any other thread.
if (cell->releasable())
{
2022-01-23 16:51:18 +00:00
switch (cell->file_segment->download_state)
2022-01-13 11:57:56 +00:00
{
case FileSegment::State::DOWNLOADED:
{
/// Cell will actually be removed only if
/// we managed to reserve enough space.
2022-01-21 15:39:34 +00:00
to_evict.emplace_back(key, offset);
break;
}
2022-01-13 11:57:56 +00:00
default:
{
2022-01-21 15:39:34 +00:00
remove(key, offset, cache_lock);
2022-01-13 11:57:56 +00:00
break;
}
}
removed_size += cell_size;
--queue_size;
}
}
if (is_overflow())
return false;
2022-01-23 16:51:18 +00:00
if (cell_for_reserve && !cell_for_reserve->queue_iterator)
cell_for_reserve->queue_iterator = queue.insert(queue.end(), std::make_pair(key_, offset_));
2022-01-21 15:39:34 +00:00
for (auto & [key, offset] : to_evict)
remove(key, offset, cache_lock);
2022-01-13 11:57:56 +00:00
current_size += size - removed_size;
if (current_size > (1ull << 63))
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cache became inconsistent. There must be a bug");
2022-01-13 11:57:56 +00:00
return true;
}
2022-01-21 15:39:34 +00:00
void LRUFileCache::remove(
2022-01-24 22:07:02 +00:00
Key key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
2022-01-13 11:57:56 +00:00
{
2022-01-21 15:39:34 +00:00
LOG_TEST(log, "Remove. Key: {}, offset: {}", keyToStr(key), offset);
2022-01-13 11:57:56 +00:00
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
if (cell->queue_iterator)
queue.erase(*cell->queue_iterator);
2022-01-13 11:57:56 +00:00
2022-01-21 15:39:34 +00:00
auto & offsets = files[key];
offsets.erase(offset);
2022-01-13 11:57:56 +00:00
auto cache_file_path = path(key, offset);
if (fs::exists(cache_file_path))
{
std::cerr << "\n\n\nRemoving cache file for file segment key: " << keyToStr(key) << " and offset: " << offset << "\n\n\n";
2022-01-13 11:57:56 +00:00
try
{
fs::remove(cache_file_path);
2022-01-21 15:39:34 +00:00
if (startup_restore_finished && offsets.empty())
removeFileKey(key);
2022-01-13 11:57:56 +00:00
}
catch (...)
{
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
2022-01-13 11:57:56 +00:00
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
keyToStr(key), offset, cache_file_path, getCurrentExceptionMessage(false));
}
}
}
void LRUFileCache::restore()
{
std::lock_guard cache_lock(mutex);
Key key;
UInt64 offset;
size_t size;
std::vector<FileSegmentCell *> cells;
/// cache_base_path / key_prefix / key / offset
fs::directory_iterator key_prefix_it{cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
{
fs::directory_iterator key_it{key_prefix_it->path()};
for (; key_it != fs::directory_iterator(); ++key_it)
{
2022-01-21 15:39:34 +00:00
key = unhexUInt<UInt128>(key_it->path().filename().string().data());
2022-01-13 11:57:56 +00:00
fs::directory_iterator offset_it{key_it->path()};
for (; offset_it != fs::directory_iterator(); ++offset_it)
{
bool parsed = tryParse<UInt64>(offset, offset_it->path().filename());
if (!parsed)
2022-01-23 16:51:18 +00:00
{
LOG_WARNING(log, "Unexpected file: ", offset_it->path().string());
continue; /// Or remove? Some unexpected file.
}
2022-01-13 11:57:56 +00:00
size = offset_it->file_size();
2022-01-23 16:51:18 +00:00
if (!size)
{
fs::remove(offset_it->path());
continue;
}
2022-01-13 11:57:56 +00:00
2022-01-23 16:51:18 +00:00
if (tryReserve(key, offset, size, cache_lock))
2022-01-13 11:57:56 +00:00
{
2022-01-23 16:51:18 +00:00
auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, cache_lock);
if (cell)
cells.push_back(cell);
2022-01-13 11:57:56 +00:00
}
else
{
LOG_WARNING(log,
"Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {})",
2022-01-24 22:07:02 +00:00
max_size, availableSize(), key_it->path().string(), size);
2022-01-23 16:51:18 +00:00
fs::remove(offset_it->path());
2022-01-13 11:57:56 +00:00
}
}
}
}
/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
pcg64 generator(randomSeed());
std::shuffle(cells.begin(), cells.end(), generator);
for (const auto & cell : cells)
2022-01-21 15:39:34 +00:00
queue.splice(queue.end(), queue, *cell->queue_iterator);
2022-01-13 11:57:56 +00:00
}
void LRUFileCache::remove(const Key & key)
{
std::lock_guard cache_lock(mutex);
auto it = files.find(key);
if (it == files.end())
return;
2022-01-21 15:39:34 +00:00
auto & offsets = it->second;
2022-01-13 11:57:56 +00:00
2022-01-21 15:39:34 +00:00
for (auto & [offset, _] : offsets)
remove(key, offset, cache_lock);
}
void LRUFileCache::removeFileKey(const Key & key)
{
auto key_path = path(key);
files.erase(key);
if (fs::exists(key_path))
fs::remove(key_path);
2022-01-13 11:57:56 +00:00
}
LRUFileCache::Stat LRUFileCache::getStat()
{
std::lock_guard cache_lock(mutex);
Stat stat
{
.size = queue.size(),
2022-01-24 22:07:02 +00:00
.available = availableSize(),
2022-01-13 11:57:56 +00:00
.downloaded_size = 0,
.downloading_size = 0,
};
for (const auto & [key, offset] : queue)
{
const auto * cell = getCell(key, offset, cache_lock);
if (!cell)
2022-01-21 15:39:34 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
2022-01-13 11:57:56 +00:00
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset);
2022-01-23 16:51:18 +00:00
switch (cell->file_segment->download_state)
2022-01-13 11:57:56 +00:00
{
case FileSegment::State::DOWNLOADED:
{
++stat.downloaded_size;
break;
}
case FileSegment::State::DOWNLOADING:
{
++stat.downloading_size;
break;
}
default:
break;
}
}
return stat;
}
2022-01-22 22:56:24 +00:00
void LRUFileCache::reduceSizeToDownloaded(
2022-01-24 22:07:02 +00:00
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
2022-01-21 15:39:34 +00:00
{
2022-01-22 22:56:24 +00:00
/**
* In case file was partially downloaded and it's download cannot be continued
* because of no space left in cache, we need to be able to cut cell's size to downloaded_size.
*/
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
auto * cell = getCell(key, offset, cache_lock);
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
if (!cell)
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "No cell found for key: {}, offset: {}", keyToStr(key), offset);
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
const auto & file_segment = cell->file_segment;
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
size_t downloaded_size = file_segment->downloaded_size;
if (downloaded_size == file_segment->range().size())
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
"Nothing to reduce, file segment fully downloaded, key: {}, offset: {}", keyToStr(key), offset);
2022-01-13 11:57:56 +00:00
2022-01-22 22:56:24 +00:00
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED);
2022-01-21 15:39:34 +00:00
}
2022-01-23 16:51:18 +00:00
bool LRUFileCache::isLastFileSegmentHolder(
2022-01-24 22:07:02 +00:00
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
2022-01-21 15:39:34 +00:00
{
2022-01-23 16:51:18 +00:00
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cell found for key: {}, offset: {}", keyToStr(key), offset);
/// The caller of this method is last file segment holder if use count is 2 and the second
/// pointer is cache itself,
return cell->file_segment.use_count() == 2;
2022-01-13 11:57:56 +00:00
}
2022-01-22 22:56:24 +00:00
LRUFileCache::FileSegmentCell::FileSegmentCell(FileSegmentPtr file_segment_, LRUQueue & queue_)
: file_segment(file_segment_)
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
/**
* Cell can be created with either DOWNLOADED or EMPTY file segment's state.
2022-01-23 16:51:18 +00:00
* File segment acquires DOWNLOADING state and creates LRUQueue iterator on first
2022-01-22 22:56:24 +00:00
* successful getOrSetDownaloder call.
*/
2022-01-13 11:57:56 +00:00
2022-01-22 22:56:24 +00:00
switch (file_segment->download_state)
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
case FileSegment::State::DOWNLOADED:
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
queue_iterator = queue_.insert(queue_.end(), getKeyAndOffset());
2022-01-13 11:57:56 +00:00
break;
}
2022-01-22 22:56:24 +00:00
case FileSegment::State::EMPTY:
2022-01-13 11:57:56 +00:00
{
break;
}
default:
2022-01-22 22:56:24 +00:00
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
"Can create cell with either DOWNLOADED or EMPTY state, got: {}",
FileSegment::stateToString(file_segment->download_state));
2022-01-13 11:57:56 +00:00
}
2022-01-21 15:39:34 +00:00
}
2022-01-26 09:35:46 +00:00
String LRUFileCache::dumpStructure(const Key & key_)
2022-01-21 15:39:34 +00:00
{
std::lock_guard cache_lock(mutex);
WriteBufferFromOwnString result;
for (auto it = queue.begin(); it != queue.end(); ++it)
{
auto [key, offset] = *it;
2022-01-26 09:35:46 +00:00
if (key == key_)
{
auto * cell = getCell(key, offset, cache_lock);
result << (it != queue.begin() ? ", " : "") << cell->file_segment->range().toString();
result << "(state: " << cell->file_segment->download_state << ")";
}
2022-01-13 11:57:56 +00:00
}
2022-01-21 15:39:34 +00:00
return result.str();
}
2022-01-13 11:57:56 +00:00
}