2022-01-22 22:56:24 +00:00
# include "FileSegment.h"
2022-01-30 11:35:28 +00:00
# include <base/getThreadId.h>
2022-01-22 22:56:24 +00:00
# include <Common/FileCache.h>
2022-01-30 11:35:28 +00:00
# include <Common/hex.h>
2022-02-02 14:25:25 +00:00
# include <IO/WriteBufferFromString.h>
# include <IO/Operators.h>
2022-01-30 11:35:28 +00:00
# include <filesystem>
2022-01-22 22:56:24 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-02-18 15:38:23 +00:00
extern const int REMOTE_FS_OBJECT_CACHE_ERROR ;
2022-01-30 11:35:28 +00:00
extern const int LOGICAL_ERROR ;
2022-01-22 22:56:24 +00:00
}
FileSegment : : FileSegment (
size_t offset_ ,
size_t size_ ,
2022-01-23 16:51:18 +00:00
const Key & key_ ,
2022-02-18 15:38:23 +00:00
IFileCache * cache_ ,
2022-01-22 22:56:24 +00:00
State download_state_ )
: segment_range ( offset_ , offset_ + size_ - 1 )
, download_state ( download_state_ )
, file_key ( key_ )
, cache ( cache_ )
2022-02-18 15:38:23 +00:00
# ifndef NDEBUG
2022-01-30 11:35:28 +00:00
, log ( & Poco : : Logger : : get ( fmt : : format ( " FileSegment({}) : {} " , getHexUIntLowercase ( key_ ) , range ( ) . toString ( ) ) ) )
2022-02-18 15:38:23 +00:00
# else
, log ( & Poco : : Logger : : get ( " FileSegment " ) )
# endif
2022-01-22 22:56:24 +00:00
{
2022-03-21 11:30:25 +00:00
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
switch ( download_state )
{
/// EMPTY is used when file segment is not in cache and
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
case ( State : : EMPTY ) :
{
break ;
}
2022-03-21 20:20:15 +00:00
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
2022-03-21 11:30:25 +00:00
/// or on reduceSizeToDownloaded() -- when file segment object is updated.
case ( State : : DOWNLOADED ) :
{
reserved_size = downloaded_size = size_ ;
break ;
}
/// DOWNLOADING is used only for write-through caching (e.g. getOrSetDownloader() is not
/// needed, downloader is set on file segment creation).
case ( State : : DOWNLOADING ) :
{
downloader_id = getCallerId ( ) ;
break ;
}
default :
{
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state " ) ;
}
}
2022-01-22 22:56:24 +00:00
}
2022-01-23 16:51:18 +00:00
FileSegment : : State FileSegment : : state ( ) const
2022-01-22 22:56:24 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-23 16:51:18 +00:00
return download_state ;
2022-01-22 22:56:24 +00:00
}
2022-02-18 15:38:23 +00:00
size_t FileSegment : : getDownloadOffset ( ) const
2022-01-22 22:56:24 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-03-14 16:33:29 +00:00
return range ( ) . left + getDownloadedSize ( segment_lock ) ;
}
2022-04-07 16:46:46 +00:00
size_t FileSegment : : getDownloadedSize ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
return getDownloadedSize ( segment_lock ) ;
}
2022-03-14 16:33:29 +00:00
size_t FileSegment : : getDownloadedSize ( std : : lock_guard < std : : mutex > & /* segment_lock */ ) const
{
if ( download_state = = State : : DOWNLOADED )
return downloaded_size ;
std : : lock_guard download_lock ( download_mutex ) ;
return downloaded_size ;
2022-01-22 22:56:24 +00:00
}
String FileSegment : : getCallerId ( )
{
2022-04-07 16:46:46 +00:00
return getCallerIdImpl ( ) ;
2022-03-08 09:58:37 +00:00
}
2022-04-07 16:46:46 +00:00
String FileSegment : : getCallerIdImpl ( )
2022-03-08 09:58:37 +00:00
{
2022-04-07 16:46:46 +00:00
if ( ! CurrentThread : : isInitialized ( )
| | ! CurrentThread : : get ( ) . getQueryContext ( )
| | CurrentThread : : getQueryId ( ) . size = = 0 )
return " None: " + toString ( getThreadId ( ) ) ;
2022-01-22 22:56:24 +00:00
2022-01-26 18:43:23 +00:00
return CurrentThread : : getQueryId ( ) . toString ( ) + " : " + toString ( getThreadId ( ) ) ;
2022-01-22 22:56:24 +00:00
}
String FileSegment : : getOrSetDownloader ( )
{
std : : lock_guard segment_lock ( mutex ) ;
2022-04-11 15:51:49 +00:00
if ( detached )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Cannot set downloader for a detached file segment " ) ;
2022-01-22 22:56:24 +00:00
if ( downloader_id . empty ( ) )
{
2022-03-09 17:14:28 +00:00
assert ( download_state ! = State : : DOWNLOADING ) ;
2022-02-01 19:10:56 +00:00
if ( download_state ! = State : : EMPTY
& & download_state ! = State : : PARTIALLY_DOWNLOADED )
2022-03-09 17:14:28 +00:00
return " None " ;
2022-02-01 19:10:56 +00:00
2022-01-22 22:56:24 +00:00
downloader_id = getCallerId ( ) ;
download_state = State : : DOWNLOADING ;
}
2022-01-26 18:43:23 +00:00
else if ( downloader_id = = getCallerId ( ) )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to set the same downloader for segment {} for the second time " , range ( ) . toString ( ) ) ;
2022-01-22 22:56:24 +00:00
2022-01-26 18:43:23 +00:00
return downloader_id ;
}
2022-03-06 19:33:07 +00:00
void FileSegment : : resetDownloader ( )
{
std : : lock_guard segment_lock ( mutex ) ;
if ( downloader_id . empty ( ) )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " There is no downloader " ) ;
2022-03-06 19:33:07 +00:00
if ( getCallerId ( ) ! = downloader_id )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Downloader can be reset only by downloader " ) ;
resetDownloaderImpl ( segment_lock ) ;
}
2022-03-06 19:33:07 +00:00
2022-03-09 09:36:52 +00:00
void FileSegment : : resetDownloaderImpl ( std : : lock_guard < std : : mutex > & segment_lock )
{
2022-03-07 21:03:12 +00:00
if ( downloaded_size = = range ( ) . size ( ) )
setDownloaded ( segment_lock ) ;
else
download_state = State : : PARTIALLY_DOWNLOADED ;
2022-03-06 19:33:07 +00:00
downloader_id . clear ( ) ;
}
2022-01-26 18:43:23 +00:00
String FileSegment : : getDownloader ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-22 22:56:24 +00:00
return downloader_id ;
}
bool FileSegment : : isDownloader ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
return getCallerId ( ) = = downloader_id ;
}
2022-04-14 11:17:04 +00:00
bool FileSegment : : isDownloaderImpl ( std : : lock_guard < std : : mutex > & /* segment+_lock */ ) const
{
return getCallerId ( ) = = downloader_id ;
}
2022-01-26 18:43:23 +00:00
FileSegment : : RemoteFileReaderPtr FileSegment : : getRemoteFileReader ( )
{
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
2022-01-26 18:43:23 +00:00
return remote_file_reader ;
}
void FileSegment : : setRemoteFileReader ( RemoteFileReaderPtr remote_file_reader_ )
{
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
2022-01-26 18:43:23 +00:00
if ( remote_file_reader )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Remote file reader already exists " ) ;
remote_file_reader = remote_file_reader_ ;
}
2022-03-17 19:29:07 +00:00
void FileSegment : : resetRemoteFileReader ( )
{
if ( ! isDownloader ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
if ( ! remote_file_reader )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Remote file reader does not exist " ) ;
remote_file_reader . reset ( ) ;
}
2022-03-16 12:27:58 +00:00
void FileSegment : : write ( const char * from , size_t size , size_t offset_ )
2022-01-22 22:56:24 +00:00
{
2022-01-23 16:51:18 +00:00
if ( ! size )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Writing zero size is not allowed " ) ;
2022-01-23 16:51:18 +00:00
2022-01-24 22:07:02 +00:00
if ( availableSize ( ) < size )
2022-01-22 22:56:24 +00:00
throw Exception (
2022-02-18 15:38:23 +00:00
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-24 22:07:02 +00:00
" Not enough space is reserved. Available: {}, expected: {} " , availableSize ( ) , size ) ;
2022-01-22 22:56:24 +00:00
2022-01-23 16:51:18 +00:00
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-26 18:43:23 +00:00
" Only downloader can do the downloading. (CallerId: {}, DownloaderId: {}) " ,
getCallerId ( ) , downloader_id ) ;
2022-01-23 16:51:18 +00:00
2022-03-17 16:50:51 +00:00
if ( downloaded_size = = range ( ) . size ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded " ,
size , offset_ ) ;
2022-03-14 16:33:29 +00:00
auto download_offset = range ( ) . left + downloaded_size ;
if ( offset_ ! = download_offset )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-03-17 16:50:51 +00:00
" Attempt to write {} bytes to offset: {}, but current download offset is {} " ,
2022-03-14 16:33:29 +00:00
size , offset_ , download_offset ) ;
2022-03-16 12:27:58 +00:00
2022-04-11 15:51:49 +00:00
assertNotDetached ( ) ;
2022-01-26 18:43:23 +00:00
if ( ! cache_writer )
2022-01-22 22:56:24 +00:00
{
2022-03-14 16:33:29 +00:00
if ( downloaded_size > 0 )
2022-03-17 16:50:51 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
2022-03-18 09:16:06 +00:00
" Cache writer was finalized (downloaded size: {}, state: {}) " ,
2022-03-17 16:50:51 +00:00
downloaded_size , stateToString ( download_state ) ) ;
2022-03-14 16:33:29 +00:00
2022-02-18 15:38:23 +00:00
auto download_path = cache - > getPathInLocalCache ( key ( ) , offset ( ) ) ;
2022-01-26 18:43:23 +00:00
cache_writer = std : : make_unique < WriteBufferFromFile > ( download_path ) ;
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
try
{
cache_writer - > write ( from , size ) ;
2022-03-14 16:33:29 +00:00
std : : lock_guard download_lock ( download_mutex ) ;
2022-02-02 14:25:25 +00:00
cache_writer - > next ( ) ;
2022-03-14 16:33:29 +00:00
downloaded_size + = size ;
2022-02-02 14:25:25 +00:00
}
2022-03-17 19:29:07 +00:00
catch ( Exception & e )
2022-02-02 14:25:25 +00:00
{
2022-02-18 15:38:23 +00:00
std : : lock_guard segment_lock ( mutex ) ;
2022-04-07 16:46:46 +00:00
wrapWithCacheInfo ( e , " while writing into cache " , segment_lock ) ;
2022-03-17 19:29:07 +00:00
2022-04-07 16:46:46 +00:00
setDownloadFailed ( segment_lock ) ;
2022-03-10 09:56:48 +00:00
2022-03-16 12:27:58 +00:00
cv . notify_all ( ) ;
2022-02-18 15:38:23 +00:00
2022-02-02 14:25:25 +00:00
throw ;
}
2022-03-16 13:29:21 +00:00
2022-03-14 16:33:29 +00:00
assert ( getDownloadOffset ( ) = = offset_ + size ) ;
2022-01-22 22:56:24 +00:00
}
2022-04-01 14:45:15 +00:00
void FileSegment : : writeInMemory ( const char * from , size_t size )
{
if ( ! size )
2022-04-07 16:46:46 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Attempt to write zero size cache file " ) ;
2022-04-01 14:45:15 +00:00
if ( availableSize ( ) < size )
throw Exception (
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Not enough space is reserved. Available: {}, expected: {} " , availableSize ( ) , size ) ;
2022-04-11 15:51:49 +00:00
assertNotDetached ( ) ;
2022-04-01 14:45:15 +00:00
std : : lock_guard segment_lock ( mutex ) ;
if ( cache_writer )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cache writer already initialized " ) ;
auto download_path = cache - > getPathInLocalCache ( key ( ) , offset ( ) ) ;
cache_writer = std : : make_unique < WriteBufferFromFile > ( download_path , size + 1 ) ;
try
{
cache_writer - > write ( from , size ) ;
}
2022-04-07 16:46:46 +00:00
catch ( Exception & e )
2022-04-01 14:45:15 +00:00
{
2022-04-07 16:46:46 +00:00
wrapWithCacheInfo ( e , " while writing into cache " , segment_lock ) ;
2022-04-01 14:45:15 +00:00
2022-04-07 16:46:46 +00:00
setDownloadFailed ( segment_lock ) ;
2022-02-18 15:38:23 +00:00
2022-03-16 12:27:58 +00:00
cv . notify_all ( ) ;
2022-02-02 14:25:25 +00:00
throw ;
}
2022-04-01 14:45:15 +00:00
}
2022-03-16 13:29:21 +00:00
2022-04-01 14:45:15 +00:00
size_t FileSegment : : finalizeWrite ( )
{
std : : lock_guard segment_lock ( mutex ) ;
if ( ! cache_writer )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cache writer not initialized " ) ;
size_t size = cache_writer - > offset ( ) ;
if ( size = = 0 )
2022-04-11 15:51:49 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Writing zero size is not allowed " ) ;
assertNotDetached ( ) ;
2022-04-01 14:45:15 +00:00
try
{
cache_writer - > next ( ) ;
}
2022-04-07 16:46:46 +00:00
catch ( Exception & e )
2022-04-01 14:45:15 +00:00
{
2022-04-07 16:46:46 +00:00
wrapWithCacheInfo ( e , " while writing into cache " , segment_lock ) ;
2022-04-01 14:45:15 +00:00
2022-04-07 16:46:46 +00:00
setDownloadFailed ( segment_lock ) ;
cv . notify_all ( ) ;
2022-04-01 14:45:15 +00:00
throw ;
}
downloaded_size + = size ;
if ( downloaded_size ! = range ( ) . size ( ) )
2022-04-07 16:46:46 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Expected downloaded size to equal file segment size ({} == {}) " , downloaded_size, range().size()) ;
setDownloaded ( segment_lock ) ;
2022-04-01 14:45:15 +00:00
return size ;
2022-01-22 22:56:24 +00:00
}
FileSegment : : State FileSegment : : wait ( )
{
std : : unique_lock segment_lock ( mutex ) ;
2022-03-06 19:33:07 +00:00
if ( downloader_id . empty ( ) )
return download_state ;
2022-01-22 22:56:24 +00:00
if ( download_state = = State : : EMPTY )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Cannot wait on a file segment with empty state " ) ;
2022-01-22 22:56:24 +00:00
if ( download_state = = State : : DOWNLOADING )
{
2022-01-30 11:35:28 +00:00
LOG_TEST ( log , " {} waiting on: {}, current downloader: {} " , getCallerId ( ) , range ( ) . toString ( ) , downloader_id ) ;
2022-01-26 09:35:46 +00:00
2022-02-01 19:10:56 +00:00
assert ( ! downloader_id . empty ( ) ) ;
assert ( downloader_id ! = getCallerId ( ) ) ;
2022-01-26 09:35:46 +00:00
2022-02-21 12:54:03 +00:00
cv . wait_for ( segment_lock , std : : chrono : : seconds ( 60 ) ) ;
2022-01-22 22:56:24 +00:00
}
return download_state ;
}
bool FileSegment : : reserve ( size_t size )
{
if ( ! size )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Zero space reservation is not allowed " ) ;
2022-01-22 22:56:24 +00:00
2022-04-11 15:51:49 +00:00
assertNotDetached ( ) ;
2022-02-24 14:20:51 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-22 22:56:24 +00:00
2022-03-14 16:33:29 +00:00
auto caller_id = getCallerId ( ) ;
if ( downloader_id ! = caller_id )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Space can be reserved only by downloader (current: {}, expected: {}) " , caller_id, downloader_id) ;
2022-02-24 14:20:51 +00:00
if ( downloaded_size + size > range ( ) . size ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {}) " ,
size , range ( ) . toString ( ) , downloaded_size ) ;
2022-01-22 22:56:24 +00:00
2022-02-24 14:20:51 +00:00
assert ( reserved_size > = downloaded_size ) ;
}
2022-01-22 22:56:24 +00:00
/**
* It is possible to have downloaded_size < reserved_size when reserve is called
* in case previous downloader did not fully download current file_segment
* and the caller is going to continue ;
*/
size_t free_space = reserved_size - downloaded_size ;
size_t size_to_reserve = size - free_space ;
2022-02-18 15:38:23 +00:00
std : : lock_guard cache_lock ( cache - > mutex ) ;
2022-01-23 16:51:18 +00:00
bool reserved = cache - > tryReserve ( key ( ) , offset ( ) , size_to_reserve , cache_lock ) ;
2022-01-22 22:56:24 +00:00
if ( reserved )
reserved_size + = size ;
return reserved ;
}
2022-02-23 10:12:14 +00:00
void FileSegment : : setDownloaded ( std : : lock_guard < std : : mutex > & /* segment_lock */ )
{
2022-04-14 11:17:04 +00:00
if ( is_downloaded )
return ;
2022-02-23 10:12:14 +00:00
download_state = State : : DOWNLOADED ;
is_downloaded = true ;
2022-04-07 16:46:46 +00:00
downloader_id . clear ( ) ;
if ( cache_writer )
{
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
remote_file_reader . reset ( ) ;
}
}
void FileSegment : : setDownloadFailed ( std : : lock_guard < std : : mutex > & /* segment_lock */ )
{
download_state = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION ;
downloader_id . clear ( ) ;
2022-02-23 10:12:14 +00:00
if ( cache_writer )
{
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
2022-03-11 11:57:57 +00:00
remote_file_reader . reset ( ) ;
2022-02-23 10:12:14 +00:00
}
}
2022-01-26 18:43:23 +00:00
void FileSegment : : completeBatchAndResetDownloader ( )
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
std : : lock_guard segment_lock ( mutex ) ;
2022-04-14 11:17:04 +00:00
if ( ! isDownloaderImpl ( segment_lock ) )
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
cv . notify_all ( ) ;
2022-04-11 15:51:49 +00:00
throw Exception (
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" File segment can be completed only by downloader ({} != {}) " ,
downloader_id , getCallerId ( ) ) ;
2022-02-23 13:43:40 +00:00
}
2022-01-22 22:56:24 +00:00
2022-03-09 09:36:52 +00:00
resetDownloaderImpl ( segment_lock ) ;
2022-02-23 13:43:40 +00:00
LOG_TEST ( log , " Complete batch. Current downloaded size: {} " , downloaded_size ) ;
2022-01-26 09:35:46 +00:00
cv . notify_all ( ) ;
}
2022-03-11 11:17:17 +00:00
void FileSegment : : complete ( State state )
2022-01-26 09:35:46 +00:00
{
2022-03-17 19:29:07 +00:00
std : : lock_guard cache_lock ( cache - > mutex ) ;
std : : lock_guard segment_lock ( mutex ) ;
2022-03-01 16:00:54 +00:00
2022-04-14 11:17:04 +00:00
bool is_downloader = isDownloaderImpl ( segment_lock ) ;
2022-03-17 19:29:07 +00:00
if ( ! is_downloader )
2022-02-23 13:43:40 +00:00
{
2022-03-17 19:29:07 +00:00
cv . notify_all ( ) ;
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" File segment can be completed only by downloader or downloader's FileSegmentsHodler " ) ;
}
2022-02-15 09:11:33 +00:00
2022-03-17 19:29:07 +00:00
if ( state ! = State : : DOWNLOADED
& & state ! = State : : PARTIALLY_DOWNLOADED
& & state ! = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION )
{
cv . notify_all ( ) ;
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Cannot complete file segment with state: {} " , stateToString ( state ) ) ;
2022-03-01 16:00:54 +00:00
}
2022-04-14 11:17:04 +00:00
if ( state = = State : : DOWNLOADED )
setDownloaded ( segment_lock ) ;
2022-03-17 19:29:07 +00:00
download_state = state ;
2022-01-23 17:33:22 +00:00
2022-04-11 15:51:49 +00:00
assertNotDetached ( ) ;
2022-03-16 12:27:58 +00:00
try
{
2022-03-17 19:29:07 +00:00
completeImpl ( cache_lock , segment_lock ) ;
2022-03-16 12:27:58 +00:00
}
catch ( . . . )
{
2022-04-14 11:17:04 +00:00
if ( ! downloader_id . empty ( ) & & is_downloader )
2022-03-16 12:27:58 +00:00
downloader_id . clear ( ) ;
cv . notify_all ( ) ;
throw ;
2022-03-01 16:00:54 +00:00
}
2022-02-23 13:43:40 +00:00
2022-01-24 22:07:02 +00:00
cv . notify_all ( ) ;
}
2022-01-22 22:56:24 +00:00
2022-03-17 19:29:07 +00:00
void FileSegment : : complete ( std : : lock_guard < std : : mutex > & cache_lock )
2022-01-24 22:07:02 +00:00
{
2022-03-17 19:29:07 +00:00
std : : lock_guard segment_lock ( mutex ) ;
2022-01-23 16:51:18 +00:00
2022-03-17 19:29:07 +00:00
if ( download_state = = State : : SKIP_CACHE | | detached )
return ;
2022-01-22 22:56:24 +00:00
2022-04-14 11:17:04 +00:00
if ( isDownloaderImpl ( segment_lock )
& & download_state ! = State : : DOWNLOADED
& & getDownloadedSize ( segment_lock ) = = range ( ) . size ( ) )
{
2022-03-17 19:29:07 +00:00
setDownloaded ( segment_lock ) ;
2022-04-14 11:17:04 +00:00
}
2022-01-23 16:51:18 +00:00
2022-04-11 15:51:49 +00:00
assertNotDetached ( ) ;
2022-03-21 18:48:13 +00:00
if ( download_state = = State : : DOWNLOADING | | download_state = = State : : EMPTY )
{
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
/// downloader or the only owner of the segment.
2022-01-22 22:56:24 +00:00
2022-04-14 11:17:04 +00:00
bool can_update_segment_state = isDownloaderImpl ( segment_lock )
2022-03-21 18:48:13 +00:00
| | cache - > isLastFileSegmentHolder ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
2022-01-23 17:33:22 +00:00
2022-03-21 18:48:13 +00:00
if ( can_update_segment_state )
2022-03-01 16:00:54 +00:00
download_state = State : : PARTIALLY_DOWNLOADED ;
}
2022-03-16 12:27:58 +00:00
try
{
2022-04-07 16:46:46 +00:00
completeImpl ( cache_lock , segment_lock ) ;
2022-03-16 12:27:58 +00:00
}
catch ( . . . )
{
2022-04-14 11:17:04 +00:00
if ( ! downloader_id . empty ( ) & & isDownloaderImpl ( segment_lock ) )
2022-03-16 12:27:58 +00:00
downloader_id . clear ( ) ;
cv . notify_all ( ) ;
throw ;
2022-03-01 16:00:54 +00:00
}
2022-01-24 22:07:02 +00:00
cv . notify_all ( ) ;
}
2022-01-23 16:51:18 +00:00
2022-04-07 16:46:46 +00:00
void FileSegment : : completeImpl ( std : : lock_guard < std : : mutex > & cache_lock , std : : lock_guard < std : : mutex > & segment_lock )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
bool is_last_holder = cache - > isLastFileSegmentHolder ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
2022-02-15 10:27:44 +00:00
2022-03-28 23:59:53 +00:00
if ( is_last_holder
2022-03-22 09:39:58 +00:00
& & ( download_state = = State : : PARTIALLY_DOWNLOADED | | download_state = = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION ) )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
size_t current_downloaded_size = getDownloadedSize ( segment_lock ) ;
if ( current_downloaded_size = = 0 )
{
download_state = State : : SKIP_CACHE ;
LOG_TEST ( log , " Remove cell {} (nothing downloaded) " , range ( ) . toString ( ) ) ;
cache - > remove ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
}
2022-03-29 12:08:24 +00:00
else
2022-03-22 09:39:58 +00:00
{
/**
* Only last holder of current file segment can resize the cell ,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range , so we can resize
* it only when nobody needs it .
*/
LOG_TEST ( log , " Resize cell {} to downloaded: {} " , range ( ) . toString ( ) , current_downloaded_size ) ;
cache - > reduceSizeToDownloaded ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
}
2022-01-23 16:51:18 +00:00
2022-03-29 12:08:24 +00:00
detached = true ;
2022-01-23 16:51:18 +00:00
2022-03-22 09:39:58 +00:00
if ( cache_writer )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
remote_file_reader . reset ( ) ;
2022-01-24 22:07:02 +00:00
}
2022-01-22 22:56:24 +00:00
}
2022-04-14 11:17:04 +00:00
if ( ! downloader_id . empty ( ) & & ( isDownloaderImpl ( segment_lock ) | | is_last_holder ) )
2022-01-26 09:35:46 +00:00
{
2022-01-30 11:35:28 +00:00
LOG_TEST ( log , " Clearing downloader id: {}, current state: {} " , downloader_id , stateToString ( download_state ) ) ;
2022-01-24 22:07:02 +00:00
downloader_id . clear ( ) ;
2022-01-26 09:35:46 +00:00
}
2022-01-24 22:07:02 +00:00
2022-03-21 18:48:13 +00:00
assertCorrectnessImpl ( segment_lock ) ;
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
String FileSegment : : getInfoForLog ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
2022-03-17 17:29:31 +00:00
return getInfoForLogImpl ( segment_lock ) ;
}
2022-02-02 14:25:25 +00:00
2022-03-17 17:29:31 +00:00
String FileSegment : : getInfoForLogImpl ( std : : lock_guard < std : : mutex > & segment_lock ) const
{
2022-02-02 14:25:25 +00:00
WriteBufferFromOwnString info ;
info < < " File segment: " < < range ( ) . toString ( ) < < " , " ;
info < < " state: " < < download_state < < " , " ;
2022-03-14 16:33:29 +00:00
info < < " downloaded size: " < < getDownloadedSize ( segment_lock ) < < " , " ;
2022-02-02 14:25:25 +00:00
info < < " downloader id: " < < downloader_id < < " , " ;
info < < " caller id: " < < getCallerId ( ) ;
return info . str ( ) ;
}
2022-04-07 16:46:46 +00:00
void FileSegment : : wrapWithCacheInfo ( Exception & e , const String & message , std : : lock_guard < std : : mutex > & segment_lock ) const
{
e . addMessage ( fmt : : format ( " {}, current cache state: {} " , message , getInfoForLogImpl ( segment_lock ) ) ) ;
}
2022-01-26 18:43:23 +00:00
String FileSegment : : stateToString ( FileSegment : : State state )
2022-01-22 22:56:24 +00:00
{
switch ( state )
{
case FileSegment : : State : : DOWNLOADED :
return " DOWNLOADED " ;
case FileSegment : : State : : EMPTY :
return " EMPTY " ;
case FileSegment : : State : : DOWNLOADING :
return " DOWNLOADING " ;
case FileSegment : : State : : PARTIALLY_DOWNLOADED :
return " PARTIALLY DOWNLOADED " ;
case FileSegment : : State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION :
return " PARTIALLY DOWNLOADED NO CONTINUATION " ;
case FileSegment : : State : : SKIP_CACHE :
return " SKIP_CACHE " ;
}
2022-03-14 12:45:09 +00:00
__builtin_unreachable ( ) ;
2022-01-22 22:56:24 +00:00
}
2022-03-21 18:48:13 +00:00
void FileSegment : : assertCorrectness ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
assertCorrectnessImpl ( segment_lock ) ;
}
void FileSegment : : assertCorrectnessImpl ( std : : lock_guard < std : : mutex > & /* segment_lock */ ) const
{
assert ( downloader_id . empty ( ) = = ( download_state ! = FileSegment : : State : : DOWNLOADING ) ) ;
assert ( ! downloader_id . empty ( ) = = ( download_state = = FileSegment : : State : : DOWNLOADING ) ) ;
assert ( download_state ! = FileSegment : : State : : DOWNLOADED | | std : : filesystem : : file_size ( cache - > getPathInLocalCache ( key ( ) , offset ( ) ) ) > 0 ) ;
}
2022-04-11 15:51:49 +00:00
void FileSegment : : assertNotDetached ( ) const
{
if ( detached )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Operation not allowed, file segment is detached " ) ;
}
2022-04-07 23:58:55 +00:00
FileSegmentPtr FileSegment : : getSnapshot ( const FileSegmentPtr & file_segment , std : : lock_guard < std : : mutex > & /* cache_lock */ )
2022-04-07 16:46:46 +00:00
{
auto snapshot = std : : make_shared < FileSegment > (
file_segment - > offset ( ) ,
file_segment - > range ( ) . size ( ) ,
file_segment - > key ( ) ,
nullptr ,
2022-04-07 23:58:55 +00:00
State : : EMPTY ) ;
2022-04-07 16:46:46 +00:00
snapshot - > hits_count = file_segment - > getHitsCount ( ) ;
snapshot - > ref_count = file_segment . use_count ( ) ;
snapshot - > downloaded_size = file_segment - > getDownloadedSize ( ) ;
2022-04-07 23:58:55 +00:00
snapshot - > download_state = file_segment - > state ( ) ;
2022-04-07 16:46:46 +00:00
return snapshot ;
}
2022-03-17 19:29:07 +00:00
FileSegmentsHolder : : ~ FileSegmentsHolder ( )
{
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
/// remain only uncompleted file segments.
IFileCache * cache = nullptr ;
for ( auto file_segment_it = file_segments . begin ( ) ; file_segment_it ! = file_segments . end ( ) ; )
{
2022-03-28 23:59:53 +00:00
auto current_file_segment_it = file_segment_it ;
2022-03-17 19:29:07 +00:00
auto & file_segment = * current_file_segment_it ;
2022-04-12 07:37:03 +00:00
if ( ! cache )
cache = file_segment - > cache ;
2022-04-11 15:51:49 +00:00
if ( file_segment - > detached )
{
2022-04-13 09:35:12 +00:00
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
2022-04-13 09:35:46 +00:00
assert ( file_segment - > state ( ) = = FileSegment : : State : : EMPTY ) ;
2022-04-11 15:51:49 +00:00
file_segment_it = file_segments . erase ( current_file_segment_it ) ;
continue ;
}
2022-03-17 19:29:07 +00:00
try
{
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std : : lock_guard cache_lock ( cache - > mutex ) ;
file_segment - > complete ( cache_lock ) ;
2022-03-28 23:59:53 +00:00
file_segment_it = file_segments . erase ( current_file_segment_it ) ;
2022-03-17 19:29:07 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
2022-03-31 16:13:58 +00:00
assert ( false ) ;
2022-03-17 19:29:07 +00:00
}
}
}
2022-02-12 22:20:05 +00:00
String FileSegmentsHolder : : toString ( )
{
String ranges ;
for ( const auto & file_segment : file_segments )
{
if ( ! ranges . empty ( ) )
ranges + = " , " ;
ranges + = file_segment - > range ( ) . toString ( ) ;
}
return ranges ;
}
2022-01-22 22:56:24 +00:00
}