2022-01-13 11:57:56 +00:00
# include "FileCache.h"
# include <Common/randomSeed.h>
# include <Common/SipHash.h>
# include <Common/hex.h>
2022-03-14 19:15:07 +00:00
# include <Common/FileCacheSettings.h>
2022-01-13 11:57:56 +00:00
# include <IO/ReadHelpers.h>
2022-01-21 15:39:34 +00:00
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadSettings.h>
# include <IO/WriteBufferFromString.h>
# include <IO/Operators.h>
2022-01-13 11:57:56 +00:00
# include <pcg-random/pcg_random.hpp>
# include <filesystem>
namespace fs = std : : filesystem ;
namespace DB
{
2022-01-21 15:39:34 +00:00
namespace ErrorCodes
{
2022-02-18 15:38:23 +00:00
extern const int REMOTE_FS_OBJECT_CACHE_ERROR ;
2022-01-23 17:33:22 +00:00
extern const int LOGICAL_ERROR ;
2022-01-21 15:39:34 +00:00
}
2022-01-13 11:57:56 +00:00
namespace
{
2022-02-18 15:38:23 +00:00
String keyToStr ( const IFileCache : : Key & key )
2022-01-13 11:57:56 +00:00
{
return getHexUIntLowercase ( key ) ;
}
}
2022-02-18 15:38:23 +00:00
IFileCache : : IFileCache (
2022-01-26 18:43:23 +00:00
const String & cache_base_path_ ,
2022-03-14 19:15:07 +00:00
const FileCacheSettings & cache_settings_ )
2022-01-26 18:43:23 +00:00
: cache_base_path ( cache_base_path_ )
2022-03-21 11:30:25 +00:00
, max_size ( cache_settings_ . max_size )
, max_element_size ( cache_settings_ . max_elements )
, max_file_segment_size ( cache_settings_ . max_file_segment_size )
2022-01-13 11:57:56 +00:00
{
}
2022-02-18 15:38:23 +00:00
IFileCache : : Key IFileCache : : hash ( const String & path )
2022-01-13 11:57:56 +00:00
{
return sipHash128 ( path . data ( ) , path . size ( ) ) ;
}
2022-02-18 15:38:23 +00:00
String IFileCache : : getPathInLocalCache ( const Key & key , size_t offset )
2022-01-13 11:57:56 +00:00
{
auto key_str = keyToStr ( key ) ;
return fs : : path ( cache_base_path ) / key_str . substr ( 0 , 3 ) / key_str / std : : to_string ( offset ) ;
}
2022-02-18 15:38:23 +00:00
String IFileCache : : getPathInLocalCache ( const Key & key )
2022-01-13 11:57:56 +00:00
{
auto key_str = keyToStr ( key ) ;
return fs : : path ( cache_base_path ) / key_str . substr ( 0 , 3 ) / key_str ;
}
2022-04-07 16:46:46 +00:00
bool IFileCache : : isReadOnly ( )
2022-01-24 22:07:02 +00:00
{
return ! CurrentThread : : isInitialized ( )
| | ! CurrentThread : : get ( ) . getQueryContext ( )
2022-02-15 09:11:33 +00:00
| | CurrentThread : : getQueryId ( ) . size = = 0 ;
2022-01-24 22:07:02 +00:00
}
2022-03-09 09:36:52 +00:00
void IFileCache : : assertInitialized ( ) const
{
if ( ! is_initialized )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Cache not initialized " ) ;
}
2022-03-14 19:15:07 +00:00
LRUFileCache : : LRUFileCache ( const String & cache_base_path_ , const FileCacheSettings & cache_settings_ )
: IFileCache ( cache_base_path_ , cache_settings_ )
2022-02-18 09:06:13 +00:00
, log ( & Poco : : Logger : : get ( " LRUFileCache " ) )
2022-03-01 17:12:34 +00:00
{
}
void LRUFileCache : : initialize ( )
2022-01-13 11:57:56 +00:00
{
if ( fs : : exists ( cache_base_path ) )
2022-02-18 15:38:23 +00:00
loadCacheInfoIntoMemory ( ) ;
2022-01-13 11:57:56 +00:00
else
fs : : create_directories ( cache_base_path ) ;
2022-01-21 15:39:34 +00:00
2022-03-01 17:12:34 +00:00
is_initialized = true ;
2022-01-13 11:57:56 +00:00
}
void LRUFileCache : : useCell (
2022-01-24 22:07:02 +00:00
const FileSegmentCell & cell , FileSegments & result , std : : lock_guard < std : : mutex > & /* cache_lock */ )
2022-01-13 11:57:56 +00:00
{
2022-01-30 11:35:28 +00:00
auto file_segment = cell . file_segment ;
2022-02-23 10:12:14 +00:00
if ( file_segment - > isDownloaded ( )
2022-02-18 15:38:23 +00:00
& & fs : : file_size ( getPathInLocalCache ( file_segment - > key ( ) , file_segment - > offset ( ) ) ) = = 0 )
2022-01-30 11:35:28 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Cannot have zero size downloaded file segments. Current file segment: {} " ,
file_segment - > range ( ) . toString ( ) ) ;
2022-01-13 11:57:56 +00:00
result . push_back ( cell . file_segment ) ;
2022-01-21 15:39:34 +00:00
/**
* A cell receives a queue iterator on first successful space reservation attempt
* ( space is reserved incrementally on each read buffer nextImpl ( ) call ) .
*/
if ( cell . queue_iterator )
{
/// Move to the end of the queue. The iterator remains valid.
queue . splice ( queue . end ( ) , queue , * cell . queue_iterator ) ;
}
2022-01-13 11:57:56 +00:00
}
LRUFileCache : : FileSegmentCell * LRUFileCache : : getCell (
2022-01-24 22:07:02 +00:00
const Key & key , size_t offset , std : : lock_guard < std : : mutex > & /* cache_lock */ )
2022-01-13 11:57:56 +00:00
{
auto it = files . find ( key ) ;
if ( it = = files . end ( ) )
return nullptr ;
auto & offsets = it - > second ;
auto cell_it = offsets . find ( offset ) ;
if ( cell_it = = offsets . end ( ) )
return nullptr ;
return & cell_it - > second ;
}
FileSegments LRUFileCache : : getImpl (
2022-01-24 22:07:02 +00:00
const Key & key , const FileSegment : : Range & range , std : : lock_guard < std : : mutex > & cache_lock )
2022-01-13 11:57:56 +00:00
{
/// Given range = [left, right] and non-overlapping ordered set of file segments,
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
auto it = files . find ( key ) ;
if ( it = = files . end ( ) )
return { } ;
const auto & file_segments = it - > second ;
if ( file_segments . empty ( ) )
{
2022-02-18 15:38:23 +00:00
auto key_path = getPathInLocalCache ( key ) ;
files . erase ( key ) ;
if ( fs : : exists ( key_path ) )
fs : : remove ( key_path ) ;
2022-01-13 11:57:56 +00:00
return { } ;
}
FileSegments result ;
auto segment_it = file_segments . lower_bound ( range . left ) ;
if ( segment_it = = file_segments . end ( ) )
{
/// N - last cached segment for given file key, segment{N}.offset < range.left:
/// segment{N} segment{N}
/// [________ [_______]
/// [__________] OR [________]
/// ^ ^
/// range.left range.left
2022-02-18 15:38:23 +00:00
const auto & cell = file_segments . rbegin ( ) - > second ;
2022-01-13 11:57:56 +00:00
if ( cell . file_segment - > range ( ) . right < range . left )
return { } ;
useCell ( cell , result , cache_lock ) ;
}
else /// segment_it <-- segmment{k}
{
if ( segment_it ! = file_segments . begin ( ) )
{
const auto & prev_cell = std : : prev ( segment_it ) - > second ;
const auto & prev_cell_range = prev_cell . file_segment - > range ( ) ;
if ( range . left < = prev_cell_range . right )
{
/// segment{k-1} segment{k}
/// [________] [_____
/// [___________
/// ^
/// range.left
useCell ( prev_cell , result , cache_lock ) ;
}
}
/// segment{k} ... segment{k-1} segment{k} segment{k}
/// [______ [______] [____ [________
/// [_________ OR [________ OR [______] ^
/// ^ ^ ^ segment{k}.offset
/// range.left range.left range.right
while ( segment_it ! = file_segments . end ( ) )
{
const auto & cell = segment_it - > second ;
if ( range . right < cell . file_segment - > range ( ) . left )
break ;
2022-01-21 15:39:34 +00:00
2022-01-13 11:57:56 +00:00
useCell ( cell , result , cache_lock ) ;
+ + segment_it ;
}
}
return result ;
}
2022-03-21 11:30:25 +00:00
FileSegments LRUFileCache : : splitRangeIntoCells (
const Key & key , size_t offset , size_t size , FileSegment : : State state , std : : lock_guard < std : : mutex > & cache_lock )
2022-02-18 09:06:13 +00:00
{
assert ( size > 0 ) ;
auto current_pos = offset ;
auto end_pos_non_included = offset + size ;
size_t current_cell_size ;
size_t remaining_size = size ;
FileSegments file_segments ;
while ( current_pos < end_pos_non_included )
{
current_cell_size = std : : min ( remaining_size , max_file_segment_size ) ;
remaining_size - = current_cell_size ;
2022-03-21 11:30:25 +00:00
auto * cell = addCell ( key , current_pos , current_cell_size , state , cache_lock ) ;
2022-02-18 09:06:13 +00:00
if ( cell )
file_segments . push_back ( cell - > file_segment ) ;
2022-03-21 11:30:25 +00:00
assert ( cell ) ;
2022-02-18 09:06:13 +00:00
current_pos + = current_cell_size ;
}
assert ( file_segments . empty ( ) | | offset + size - 1 = = file_segments . back ( ) - > range ( ) . right ) ;
return file_segments ;
}
2022-04-11 15:51:49 +00:00
void LRUFileCache : : fillHolesWithEmptyFileSegments (
FileSegments & file_segments , const Key & key , const FileSegment : : Range & range , bool fill_with_detached_file_segments , std : : lock_guard < std : : mutex > & cache_lock )
{
/// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
/// intersect with given range.
/// It can have holes:
/// [____________________] -- requested range
/// [____] [_] [_________] -- intersecting cache [segment1, ..., segmentN]
///
/// For each such hole create a cell with file segment state EMPTY.
auto it = file_segments . begin ( ) ;
auto segment_range = ( * it ) - > range ( ) ;
size_t current_pos ;
if ( segment_range . left < range . left )
{
/// [_______ -- requested range
/// [_______
/// ^
/// segment1
current_pos = segment_range . right + 1 ;
+ + it ;
}
else
current_pos = range . left ;
while ( current_pos < = range . right & & it ! = file_segments . end ( ) )
{
segment_range = ( * it ) - > range ( ) ;
if ( current_pos = = segment_range . left )
{
current_pos = segment_range . right + 1 ;
+ + it ;
continue ;
}
assert ( current_pos < segment_range . left ) ;
auto hole_size = segment_range . left - current_pos ;
if ( fill_with_detached_file_segments )
{
auto file_segment = std : : make_shared < FileSegment > ( current_pos , hole_size , key , this , FileSegment : : State : : EMPTY ) ;
file_segment - > detached = true ;
file_segments . insert ( it , file_segment ) ;
}
else
{
file_segments . splice ( it , splitRangeIntoCells ( key , current_pos , hole_size , FileSegment : : State : : EMPTY , cache_lock ) ) ;
}
current_pos = segment_range . right + 1 ;
+ + it ;
}
if ( current_pos < = range . right )
{
/// ________] -- requested range
/// _____]
/// ^
/// segmentN
auto hole_size = range . right - current_pos + 1 ;
if ( fill_with_detached_file_segments )
{
auto file_segment = std : : make_shared < FileSegment > ( current_pos , hole_size , key , this , FileSegment : : State : : EMPTY ) ;
file_segment - > detached = true ;
file_segments . insert ( file_segments . end ( ) , file_segment ) ;
}
else
{
file_segments . splice ( file_segments . end ( ) , splitRangeIntoCells ( key , current_pos , hole_size , FileSegment : : State : : EMPTY , cache_lock ) ) ;
}
}
}
2022-01-13 11:57:56 +00:00
FileSegmentsHolder LRUFileCache : : getOrSet ( const Key & key , size_t offset , size_t size )
{
2022-03-09 09:36:52 +00:00
assertInitialized ( ) ;
2022-01-13 11:57:56 +00:00
FileSegment : : Range range ( offset , offset + size - 1 ) ;
std : : lock_guard cache_lock ( mutex ) ;
2022-03-21 18:48:13 +00:00
# ifndef NDEBUG
assertCacheCorrectness ( key , cache_lock ) ;
# endif
2022-01-13 11:57:56 +00:00
/// Get all segments which intersect with the given range.
auto file_segments = getImpl ( key , range , cache_lock ) ;
if ( file_segments . empty ( ) )
{
2022-03-21 11:30:25 +00:00
file_segments = splitRangeIntoCells ( key , offset , size , FileSegment : : State : : EMPTY , cache_lock ) ;
2022-01-13 11:57:56 +00:00
}
else
{
2022-04-11 15:51:49 +00:00
fillHolesWithEmptyFileSegments ( file_segments , key , range , false , cache_lock ) ;
}
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
assert ( ! file_segments . empty ( ) ) ;
return FileSegmentsHolder ( std : : move ( file_segments ) ) ;
}
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
FileSegmentsHolder LRUFileCache : : get ( const Key & key , size_t offset , size_t size )
{
assertInitialized ( ) ;
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
FileSegment : : Range range ( offset , offset + size - 1 ) ;
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
std : : lock_guard cache_lock ( mutex ) ;
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
# ifndef NDEBUG
assertCacheCorrectness ( key , cache_lock ) ;
# endif
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
/// Get all segments which intersect with the given range.
auto file_segments = getImpl ( key , range , cache_lock ) ;
2022-01-13 11:57:56 +00:00
2022-04-11 15:51:49 +00:00
if ( file_segments . empty ( ) )
{
auto file_segment = std : : make_shared < FileSegment > ( offset , size , key , this , FileSegment : : State : : EMPTY ) ;
file_segment - > detached = true ;
file_segments = { file_segment } ;
}
else
{
fillHolesWithEmptyFileSegments ( file_segments , key , range , true , cache_lock ) ;
2022-01-13 11:57:56 +00:00
}
return FileSegmentsHolder ( std : : move ( file_segments ) ) ;
}
2022-01-21 15:39:34 +00:00
LRUFileCache : : FileSegmentCell * LRUFileCache : : addCell (
2022-01-26 18:43:23 +00:00
const Key & key , size_t offset , size_t size , FileSegment : : State state ,
2022-03-16 12:27:58 +00:00
std : : lock_guard < std : : mutex > & cache_lock )
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
/// Create a file segment cell and put it in `files` map by [key][offset].
2022-01-13 11:57:56 +00:00
if ( ! size )
return nullptr ; /// Empty files are not cached.
if ( files [ key ] . contains ( offset ) )
2022-03-16 12:27:58 +00:00
throw Exception (
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-03-14 16:33:29 +00:00
" Cache already exists for key: `{}`, offset: {}, size: {}. \n Current cache structure: {} " ,
2022-03-16 12:27:58 +00:00
keyToStr ( key ) , offset , size , dumpStructureImpl ( key , cache_lock ) ) ;
2022-01-13 11:57:56 +00:00
2022-01-21 15:39:34 +00:00
auto file_segment = std : : make_shared < FileSegment > ( offset , size , key , this , state ) ;
2022-01-22 22:56:24 +00:00
FileSegmentCell cell ( std : : move ( file_segment ) , queue ) ;
2022-01-13 11:57:56 +00:00
auto & offsets = files [ key ] ;
if ( offsets . empty ( ) )
{
2022-02-18 15:38:23 +00:00
auto key_path = getPathInLocalCache ( key ) ;
2022-01-13 11:57:56 +00:00
if ( ! fs : : exists ( key_path ) )
fs : : create_directories ( key_path ) ;
}
auto [ it , inserted ] = offsets . insert ( { offset , std : : move ( cell ) } ) ;
if ( ! inserted )
2022-03-16 12:27:58 +00:00
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Failed to insert into cache key: `{}`, offset: {}, size: {} " ,
keyToStr ( key ) , offset , size ) ;
2022-01-13 11:57:56 +00:00
return & ( it - > second ) ;
}
2022-03-21 11:30:25 +00:00
FileSegmentsHolder LRUFileCache : : setDownloading ( const Key & key , size_t offset , size_t size )
{
std : : lock_guard cache_lock ( mutex ) ;
auto * cell = getCell ( key , offset , cache_lock ) ;
if ( cell )
throw Exception (
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Cache cell already exists for key `{}` and offset {} " ,
keyToStr ( key ) , offset ) ;
auto file_segments = splitRangeIntoCells ( key , offset , size , FileSegment : : State : : DOWNLOADING , cache_lock ) ;
return FileSegmentsHolder ( std : : move ( file_segments ) ) ;
}
2022-02-23 10:12:14 +00:00
bool LRUFileCache : : tryReserve (
const Key & key_ , size_t offset_ , size_t size , std : : lock_guard < std : : mutex > & cache_lock )
2022-01-21 15:39:34 +00:00
{
2022-01-23 16:51:18 +00:00
auto removed_size = 0 ;
size_t queue_size = queue . size ( ) ;
assert ( queue_size < = max_element_size ) ;
2022-01-22 22:56:24 +00:00
2022-01-30 11:35:28 +00:00
/// Since space reservation is incremental, cache cell already exists if it's state is EMPTY.
2022-01-23 16:51:18 +00:00
/// And it cache cell does not exist on startup -- as we first check for space and then add a cell.
auto * cell_for_reserve = getCell ( key_ , offset_ , cache_lock ) ;
2022-01-22 22:56:24 +00:00
2022-01-23 16:51:18 +00:00
/// A cell acquires a LRUQueue iterator on first successful space reservation attempt.
2022-02-21 12:54:03 +00:00
/// cell_for_reserve can be nullptr here when we call tryReserve() from loadCacheInfoIntoMemory().
2022-01-23 16:51:18 +00:00
if ( ! cell_for_reserve | | ! cell_for_reserve - > queue_iterator )
queue_size + = 1 ;
2022-01-13 11:57:56 +00:00
auto is_overflow = [ & ]
{
2022-03-21 13:56:38 +00:00
/// max_size == 0 means unlimited cache size, max_element_size means unlimited number of cache elements.
2022-03-14 19:15:07 +00:00
return ( max_size ! = 0 & & current_size + size - removed_size > max_size )
2022-01-13 11:57:56 +00:00
| | ( max_element_size ! = 0 & & queue_size > max_element_size ) ;
} ;
2022-02-23 10:12:14 +00:00
std : : vector < FileSegmentCell * > to_evict ;
2022-01-13 11:57:56 +00:00
auto key_it = queue . begin ( ) ;
while ( is_overflow ( ) & & key_it ! = queue . end ( ) )
{
2022-02-24 14:20:51 +00:00
const auto [ key , offset ] = * key_it ;
+ + key_it ;
2022-01-13 11:57:56 +00:00
auto * cell = getCell ( key , offset , cache_lock ) ;
if ( ! cell )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-13 11:57:56 +00:00
" Cache became inconsistent. Key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
size_t cell_size = cell - > size ( ) ;
/// It is guaranteed that cell is not removed from cache as long as
/// pointer to corresponding file segment is hold by any other thread.
if ( cell - > releasable ( ) )
{
2022-02-23 10:12:14 +00:00
auto & file_segment = cell - > file_segment ;
std : : lock_guard segment_lock ( file_segment - > mutex ) ;
switch ( file_segment - > download_state )
2022-01-13 11:57:56 +00:00
{
case FileSegment : : State : : DOWNLOADED :
{
/// Cell will actually be removed only if
/// we managed to reserve enough space.
2022-02-23 10:12:14 +00:00
to_evict . push_back ( cell ) ;
2022-01-21 15:39:34 +00:00
break ;
}
2022-01-13 11:57:56 +00:00
default :
{
2022-02-23 10:12:14 +00:00
remove ( key , offset , cache_lock , segment_lock ) ;
2022-01-13 11:57:56 +00:00
break ;
}
}
removed_size + = cell_size ;
- - queue_size ;
}
}
if ( is_overflow ( ) )
return false ;
2022-01-23 16:51:18 +00:00
if ( cell_for_reserve & & ! cell_for_reserve - > queue_iterator )
cell_for_reserve - > queue_iterator = queue . insert ( queue . end ( ) , std : : make_pair ( key_ , offset_ ) ) ;
2022-02-23 10:12:14 +00:00
for ( auto & cell : to_evict )
{
2022-03-01 16:00:54 +00:00
auto file_segment = cell - > file_segment ;
2022-02-24 14:20:51 +00:00
if ( file_segment )
{
std : : lock_guard < std : : mutex > segment_lock ( file_segment - > mutex ) ;
remove ( file_segment - > key ( ) , file_segment - > offset ( ) , cache_lock , segment_lock ) ;
}
2022-02-23 10:12:14 +00:00
}
2022-01-13 11:57:56 +00:00
current_size + = size - removed_size ;
if ( current_size > ( 1ull < < 63 ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cache became inconsistent. There must be a bug " ) ;
2022-01-13 11:57:56 +00:00
return true ;
}
2022-03-07 13:30:57 +00:00
void LRUFileCache : : remove ( const Key & key )
{
2022-03-09 09:36:52 +00:00
assertInitialized ( ) ;
2022-03-07 13:30:57 +00:00
std : : lock_guard cache_lock ( mutex ) ;
auto it = files . find ( key ) ;
if ( it = = files . end ( ) )
return ;
auto & offsets = it - > second ;
std : : vector < FileSegmentCell * > to_remove ;
to_remove . reserve ( offsets . size ( ) ) ;
for ( auto & [ offset , cell ] : offsets )
to_remove . push_back ( & cell ) ;
for ( auto & cell : to_remove )
{
2022-03-10 09:56:48 +00:00
if ( ! cell - > releasable ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cannot remove file from cache because someone reads from it. File segment info: {} " , cell - > file_segment - > getInfoForLog ( ) ) ;
2022-03-07 13:30:57 +00:00
auto file_segment = cell - > file_segment ;
if ( file_segment )
{
std : : lock_guard < std : : mutex > segment_lock ( file_segment - > mutex ) ;
remove ( file_segment - > key ( ) , file_segment - > offset ( ) , cache_lock , segment_lock ) ;
}
}
auto key_path = getPathInLocalCache ( key ) ;
files . erase ( key ) ;
if ( fs : : exists ( key_path ) )
fs : : remove ( key_path ) ;
}
2022-03-21 11:30:25 +00:00
void LRUFileCache : : tryRemoveAll ( )
{
/// Try remove all cached files by cache_base_path.
/// Only releasable file segments are evicted.
std : : lock_guard cache_lock ( mutex ) ;
for ( auto it = queue . begin ( ) ; it ! = queue . end ( ) ; )
{
auto & [ key , offset ] = * it + + ;
auto * cell = getCell ( key , offset , cache_lock ) ;
if ( cell - > releasable ( ) )
{
auto file_segment = cell - > file_segment ;
if ( file_segment )
{
std : : lock_guard < std : : mutex > segment_lock ( file_segment - > mutex ) ;
remove ( file_segment - > key ( ) , file_segment - > offset ( ) , cache_lock , segment_lock ) ;
}
}
}
}
2022-01-21 15:39:34 +00:00
void LRUFileCache : : remove (
2022-02-23 10:12:14 +00:00
Key key , size_t offset ,
std : : lock_guard < std : : mutex > & cache_lock , std : : lock_guard < std : : mutex > & /* segment_lock */ )
2022-01-13 11:57:56 +00:00
{
2022-01-21 15:39:34 +00:00
LOG_TEST ( log , " Remove. Key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-01-13 11:57:56 +00:00
2022-01-23 17:33:22 +00:00
auto * cell = getCell ( key , offset , cache_lock ) ;
if ( ! cell )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " No cache cell for key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-01-23 17:33:22 +00:00
if ( cell - > queue_iterator )
queue . erase ( * cell - > queue_iterator ) ;
2022-01-13 11:57:56 +00:00
2022-01-21 15:39:34 +00:00
auto & offsets = files [ key ] ;
offsets . erase ( offset ) ;
2022-01-13 11:57:56 +00:00
2022-02-18 15:38:23 +00:00
auto cache_file_path = getPathInLocalCache ( key , offset ) ;
2022-01-13 11:57:56 +00:00
if ( fs : : exists ( cache_file_path ) )
{
try
{
fs : : remove ( cache_file_path ) ;
2022-03-01 17:12:34 +00:00
if ( is_initialized & & offsets . empty ( ) )
2022-02-18 15:38:23 +00:00
{
auto key_path = getPathInLocalCache ( key ) ;
files . erase ( key ) ;
if ( fs : : exists ( key_path ) )
fs : : remove ( key_path ) ;
}
2022-01-13 11:57:56 +00:00
}
catch ( . . . )
{
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-13 11:57:56 +00:00
" Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {} " ,
keyToStr ( key ) , offset , cache_file_path , getCurrentExceptionMessage ( false ) ) ;
}
}
}
2022-02-18 15:38:23 +00:00
void LRUFileCache : : loadCacheInfoIntoMemory ( )
2022-01-13 11:57:56 +00:00
{
std : : lock_guard cache_lock ( mutex ) ;
Key key ;
2022-03-30 15:12:23 +00:00
UInt64 offset = 0 ;
size_t size = 0 ;
2022-01-13 11:57:56 +00:00
std : : vector < FileSegmentCell * > cells ;
/// cache_base_path / key_prefix / key / offset
fs : : directory_iterator key_prefix_it { cache_base_path } ;
for ( ; key_prefix_it ! = fs : : directory_iterator ( ) ; + + key_prefix_it )
{
fs : : directory_iterator key_it { key_prefix_it - > path ( ) } ;
for ( ; key_it ! = fs : : directory_iterator ( ) ; + + key_it )
{
2022-01-21 15:39:34 +00:00
key = unhexUInt < UInt128 > ( key_it - > path ( ) . filename ( ) . string ( ) . data ( ) ) ;
2022-01-13 11:57:56 +00:00
fs : : directory_iterator offset_it { key_it - > path ( ) } ;
for ( ; offset_it ! = fs : : directory_iterator ( ) ; + + offset_it )
{
bool parsed = tryParse < UInt64 > ( offset , offset_it - > path ( ) . filename ( ) ) ;
if ( ! parsed )
2022-01-23 16:51:18 +00:00
{
LOG_WARNING ( log , " Unexpected file: " , offset_it - > path ( ) . string ( ) ) ;
2022-02-15 09:11:33 +00:00
continue ; /// Or just remove? Some unexpected file.
2022-01-23 16:51:18 +00:00
}
2022-01-13 11:57:56 +00:00
size = offset_it - > file_size ( ) ;
2022-01-23 16:51:18 +00:00
if ( ! size )
{
fs : : remove ( offset_it - > path ( ) ) ;
continue ;
}
2022-01-13 11:57:56 +00:00
2022-01-23 16:51:18 +00:00
if ( tryReserve ( key , offset , size , cache_lock ) )
2022-01-13 11:57:56 +00:00
{
2022-01-23 16:51:18 +00:00
auto * cell = addCell ( key , offset , size , FileSegment : : State : : DOWNLOADED , cache_lock ) ;
if ( cell )
cells . push_back ( cell ) ;
2022-01-13 11:57:56 +00:00
}
else
{
LOG_WARNING ( log ,
" Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {}) " ,
2022-01-24 22:07:02 +00:00
max_size , availableSize ( ) , key_it - > path ( ) . string ( ) , size ) ;
2022-01-23 16:51:18 +00:00
fs : : remove ( offset_it - > path ( ) ) ;
2022-01-13 11:57:56 +00:00
}
}
}
}
/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
pcg64 generator ( randomSeed ( ) ) ;
std : : shuffle ( cells . begin ( ) , cells . end ( ) , generator ) ;
for ( const auto & cell : cells )
2022-03-11 11:57:57 +00:00
{
/// Cell cache size changed and, for example, 1st file segment fits into cache
/// and 2nd file segment will fit only if first was evicted, then first will be removed and
/// cell is nullptr here.
if ( cell )
queue . splice ( queue . end ( ) , queue , * cell - > queue_iterator ) ;
}
2022-01-13 11:57:56 +00:00
}
LRUFileCache : : Stat LRUFileCache : : getStat ( )
{
std : : lock_guard cache_lock ( mutex ) ;
Stat stat
{
. size = queue . size ( ) ,
2022-01-24 22:07:02 +00:00
. available = availableSize ( ) ,
2022-01-13 11:57:56 +00:00
. downloaded_size = 0 ,
. downloading_size = 0 ,
} ;
for ( const auto & [ key , offset ] : queue )
{
const auto * cell = getCell ( key , offset , cache_lock ) ;
if ( ! cell )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-13 11:57:56 +00:00
" Cache became inconsistent. Key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-01-23 16:51:18 +00:00
switch ( cell - > file_segment - > download_state )
2022-01-13 11:57:56 +00:00
{
case FileSegment : : State : : DOWNLOADED :
{
+ + stat . downloaded_size ;
break ;
}
case FileSegment : : State : : DOWNLOADING :
{
+ + stat . downloading_size ;
break ;
}
default :
break ;
}
}
return stat ;
}
2022-01-22 22:56:24 +00:00
void LRUFileCache : : reduceSizeToDownloaded (
2022-02-23 10:12:14 +00:00
const Key & key , size_t offset ,
std : : lock_guard < std : : mutex > & cache_lock , std : : lock_guard < std : : mutex > & /* segment_lock */ )
2022-01-21 15:39:34 +00:00
{
2022-01-22 22:56:24 +00:00
/**
* In case file was partially downloaded and it ' s download cannot be continued
* because of no space left in cache , we need to be able to cut cell ' s size to downloaded_size .
*/
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
auto * cell = getCell ( key , offset , cache_lock ) ;
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
if ( ! cell )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " No cell found for key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
const auto & file_segment = cell - > file_segment ;
2022-01-21 15:39:34 +00:00
2022-01-22 22:56:24 +00:00
size_t downloaded_size = file_segment - > downloaded_size ;
if ( downloaded_size = = file_segment - > range ( ) . size ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-22 22:56:24 +00:00
" Nothing to reduce, file segment fully downloaded, key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-01-13 11:57:56 +00:00
2022-01-22 22:56:24 +00:00
cell - > file_segment = std : : make_shared < FileSegment > ( offset , downloaded_size , key , this , FileSegment : : State : : DOWNLOADED ) ;
2022-01-21 15:39:34 +00:00
}
2022-01-23 16:51:18 +00:00
bool LRUFileCache : : isLastFileSegmentHolder (
2022-02-23 10:12:14 +00:00
const Key & key , size_t offset ,
std : : lock_guard < std : : mutex > & cache_lock , std : : lock_guard < std : : mutex > & /* segment_lock */ )
2022-01-21 15:39:34 +00:00
{
2022-01-23 16:51:18 +00:00
auto * cell = getCell ( key , offset , cache_lock ) ;
if ( ! cell )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " No cell found for key: {}, offset: {} " , keyToStr ( key ) , offset ) ;
2022-02-15 09:11:33 +00:00
/// The caller of this method is last file segment holder if use count is 2 (the second pointer is cache itself)
2022-01-23 16:51:18 +00:00
return cell - > file_segment . use_count ( ) = = 2 ;
2022-01-13 11:57:56 +00:00
}
2022-04-07 16:46:46 +00:00
FileSegments LRUFileCache : : getSnapshot ( ) const
2022-03-21 11:30:25 +00:00
{
std : : lock_guard cache_lock ( mutex ) ;
FileSegments file_segments ;
2022-03-23 12:01:18 +00:00
for ( const auto & [ key , cells_by_offset ] : files )
2022-03-21 11:30:25 +00:00
{
2022-03-23 12:01:18 +00:00
for ( const auto & [ offset , cell ] : cells_by_offset )
2022-04-07 23:58:55 +00:00
file_segments . push_back ( FileSegment : : getSnapshot ( cell . file_segment , cache_lock ) ) ;
2022-03-21 11:30:25 +00:00
}
2022-04-07 16:46:46 +00:00
return file_segments ;
2022-03-21 11:30:25 +00:00
}
2022-03-23 12:01:18 +00:00
std : : vector < String > LRUFileCache : : tryGetCachePaths ( const Key & key )
{
std : : lock_guard cache_lock ( mutex ) ;
std : : vector < String > cache_paths ;
2022-03-30 11:47:44 +00:00
2022-03-23 12:01:18 +00:00
const auto & cells_by_offset = files [ key ] ;
for ( const auto & [ offset , cell ] : cells_by_offset )
2022-03-23 17:11:52 +00:00
{
2022-03-23 12:01:18 +00:00
if ( cell . file_segment - > state ( ) = = FileSegment : : State : : DOWNLOADED )
cache_paths . push_back ( getPathInLocalCache ( key , offset ) ) ;
2022-03-23 17:11:52 +00:00
}
2022-03-23 12:01:18 +00:00
return cache_paths ;
}
2022-01-22 22:56:24 +00:00
LRUFileCache : : FileSegmentCell : : FileSegmentCell ( FileSegmentPtr file_segment_ , LRUQueue & queue_ )
: file_segment ( file_segment_ )
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
/**
* Cell can be created with either DOWNLOADED or EMPTY file segment ' s state .
2022-01-23 16:51:18 +00:00
* File segment acquires DOWNLOADING state and creates LRUQueue iterator on first
2022-01-22 22:56:24 +00:00
* successful getOrSetDownaloder call .
*/
2022-01-13 11:57:56 +00:00
2022-01-22 22:56:24 +00:00
switch ( file_segment - > download_state )
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
case FileSegment : : State : : DOWNLOADED :
2022-01-13 11:57:56 +00:00
{
2022-01-22 22:56:24 +00:00
queue_iterator = queue_ . insert ( queue_ . end ( ) , getKeyAndOffset ( ) ) ;
2022-01-13 11:57:56 +00:00
break ;
}
2022-01-22 22:56:24 +00:00
case FileSegment : : State : : EMPTY :
2022-03-21 11:30:25 +00:00
case FileSegment : : State : : DOWNLOADING :
2022-01-13 11:57:56 +00:00
{
break ;
}
default :
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-03-21 11:30:25 +00:00
" Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {} " ,
2022-01-26 18:43:23 +00:00
FileSegment : : stateToString ( file_segment - > download_state ) ) ;
2022-01-13 11:57:56 +00:00
}
2022-01-21 15:39:34 +00:00
}
2022-03-21 18:48:13 +00:00
String LRUFileCache : : dumpStructure ( const Key & key )
2022-01-21 15:39:34 +00:00
{
std : : lock_guard cache_lock ( mutex ) ;
2022-03-21 18:48:13 +00:00
return dumpStructureImpl ( key , cache_lock ) ;
2022-03-16 12:27:58 +00:00
}
2022-01-21 15:39:34 +00:00
2022-03-21 18:48:13 +00:00
String LRUFileCache : : dumpStructureImpl ( const Key & key , std : : lock_guard < std : : mutex > & /* cache_lock */ )
2022-01-21 15:39:34 +00:00
{
WriteBufferFromOwnString result ;
2022-03-21 18:48:13 +00:00
const auto & cells_by_offset = files [ key ] ;
for ( const auto & [ offset , cell ] : cells_by_offset )
result < < cell . file_segment - > getInfoForLog ( ) < < " \n " ;
return result . str ( ) ;
}
void LRUFileCache : : assertCacheCorrectness ( const Key & key , std : : lock_guard < std : : mutex > & /* cache_lock */ )
{
const auto & cells_by_offset = files [ key ] ;
for ( const auto & [ _ , cell ] : cells_by_offset )
2022-01-21 15:39:34 +00:00
{
2022-03-21 18:48:13 +00:00
const auto & file_segment = cell . file_segment ;
file_segment - > assertCorrectness ( ) ;
2022-01-13 11:57:56 +00:00
}
2022-01-21 15:39:34 +00:00
}
2022-01-13 11:57:56 +00:00
}