2021-02-16 21:33:02 +00:00
# pragma once
2021-02-21 12:50:55 +00:00
# if defined(__linux__) || defined(__FreeBSD__)
2021-02-16 21:33:02 +00:00
# include <chrono>
# include <pcg_random.hpp>
# include <filesystem>
# include <city.h>
# include <fcntl.h>
2021-02-27 16:04:32 +00:00
# include <absl/container/flat_hash_map.h>
# include <absl/container/flat_hash_set.h>
2021-02-16 21:33:02 +00:00
# include <common/unaligned.h>
2021-02-27 16:04:32 +00:00
# include <Common/Stopwatch.h>
2021-02-16 21:33:02 +00:00
# include <Common/randomSeed.h>
# include <Common/Arena.h>
# include <Common/ArenaWithFreeLists.h>
# include <Common/MemorySanitizer.h>
2021-03-18 09:55:17 +00:00
# include <Common/HashTable/HashMap.h>
2021-02-16 21:33:02 +00:00
# include <IO/AIO.h>
# include <Dictionaries/DictionaryStructure.h>
# include <Dictionaries/ICacheDictionaryStorage.h>
2021-02-17 18:19:04 +00:00
# include <Dictionaries/DictionaryHelpers.h>
2021-02-16 21:33:02 +00:00
namespace ProfileEvents
{
extern const Event FileOpen ;
extern const Event WriteBufferAIOWrite ;
extern const Event WriteBufferAIOWriteBytes ;
}
namespace DB
{
namespace ErrorCodes
{
extern const int AIO_READ_ERROR ;
extern const int AIO_WRITE_ERROR ;
extern const int CANNOT_ALLOCATE_MEMORY ;
extern const int CANNOT_CREATE_DIRECTORY ;
extern const int CANNOT_FSYNC ;
extern const int CANNOT_IO_GETEVENTS ;
extern const int CANNOT_IO_SUBMIT ;
extern const int CANNOT_OPEN_FILE ;
extern const int CORRUPTED_DATA ;
extern const int FILE_DOESNT_EXIST ;
extern const int UNSUPPORTED_METHOD ;
extern const int NOT_IMPLEMENTED ;
}
struct SSDCacheDictionaryStorageConfiguration
{
const size_t strict_max_lifetime_seconds ;
const DictionaryLifetime lifetime ;
const std : : string file_path ;
const size_t max_partitions_count ;
const size_t block_size ;
const size_t file_blocks_size ;
const size_t read_buffer_blocks_size ;
const size_t write_buffer_blocks_size ;
} ;
/** Simple Key is serialized in block with following structure
key | data_size | data
8 bytes | 8 bytes | data_size bytes
Complex Key is serialized in block with following structure
key_size | key_data | data_size | data
8 bytes | key_size bytes | 8 bytes | data_size bytes
*/
template < typename TKeyType >
struct SSDCacheKey final
{
using KeyType = TKeyType ;
SSDCacheKey ( KeyType key_ , size_t size_ , const char * data_ )
: key ( key_ )
, size ( size_ )
, data ( data_ )
{ }
KeyType key ;
size_t size ;
const char * data ;
} ;
using SSDCacheSimpleKey = SSDCacheKey < UInt64 > ;
using SSDCacheComplexKey = SSDCacheKey < StringRef > ;
/** Block is serialized with following structure
check_sum | keys_size | [ keys ]
8 bytes | 8 bytes |
*/
class SSDCacheBlock final
{
static constexpr size_t block_header_check_sum_size = sizeof ( size_t ) ;
static constexpr size_t block_header_keys_size = sizeof ( size_t ) ;
public :
/// Block header size
static constexpr size_t block_header_size = block_header_check_sum_size + block_header_keys_size ;
explicit SSDCacheBlock ( size_t block_size_ )
: block_size ( block_size_ )
{ }
2021-03-04 14:34:39 +00:00
/// Checks if simple key can be written in empty block with block_size
2021-02-16 21:33:02 +00:00
static bool canBeWrittenInEmptyBlock ( SSDCacheSimpleKey & simple_key , size_t block_size )
{
static constexpr size_t simple_key_size = sizeof ( simple_key . key ) ;
return ( block_header_size + simple_key_size + sizeof ( simple_key . size ) + simple_key . size ) < = block_size ;
}
2021-03-04 14:34:39 +00:00
/// Checks if complex key can be written in empty block with block_size
2021-02-16 21:33:02 +00:00
static bool canBeWrittenInEmptyBlock ( SSDCacheComplexKey & complex_key , size_t block_size )
{
StringRef & key = complex_key . key ;
size_t complex_key_size = sizeof ( key . size ) + key . size ;
return ( block_header_size + complex_key_size + sizeof ( complex_key . size ) + complex_key . size ) < = block_size ;
}
/// Reset block with new block_data
/// block_data must be filled with zeroes if it is new block
2021-03-18 09:55:17 +00:00
inline void reset ( char * new_block_data )
2021-02-16 21:33:02 +00:00
{
block_data = new_block_data ;
current_block_offset = block_header_size ;
keys_size = unalignedLoad < size_t > ( new_block_data + block_header_check_sum_size ) ;
}
/// Check if it is enough place to write key in block
2021-03-18 09:55:17 +00:00
inline bool enoughtPlaceToWriteKey ( const SSDCacheSimpleKey & cache_key ) const
2021-02-16 21:33:02 +00:00
{
return ( current_block_offset + ( sizeof ( cache_key . key ) + sizeof ( cache_key . size ) + cache_key . size ) ) < = block_size ;
}
2021-03-04 14:34:39 +00:00
/// Check if it is enough place to write key in block
2021-03-18 09:55:17 +00:00
inline bool enoughtPlaceToWriteKey ( const SSDCacheComplexKey & cache_key ) const
2021-02-16 21:33:02 +00:00
{
const StringRef & key = cache_key . key ;
size_t complex_key_size = sizeof ( key . size ) + key . size ;
return ( current_block_offset + ( complex_key_size + sizeof ( cache_key . size ) + cache_key . size ) ) < = block_size ;
}
/// Write key and returns offset in ssd cache block where data is written
2021-02-17 21:42:51 +00:00
/// It is client responsibility to check if there is enough place in block to write key
/// Returns true if key was written and false if there was not enough place to write key
2021-03-18 09:55:17 +00:00
inline bool writeKey ( const SSDCacheSimpleKey & cache_key , size_t & offset_in_block )
2021-02-16 21:33:02 +00:00
{
assert ( cache_key . size > 0 ) ;
if ( ! enoughtPlaceToWriteKey ( cache_key ) )
return false ;
char * current_block_offset_data = block_data + current_block_offset ;
/// Write simple key
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( & cache_key . key ) , sizeof ( cache_key . key ) ) ;
current_block_offset_data + = sizeof ( cache_key . key ) ;
current_block_offset + = sizeof ( cache_key . key ) ;
/// Write serialized columns size
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( & cache_key . size ) , sizeof ( cache_key . size ) ) ;
current_block_offset_data + = sizeof ( cache_key . size ) ;
current_block_offset + = sizeof ( cache_key . size ) ;
offset_in_block = current_block_offset ;
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( cache_key . data ) , cache_key . size ) ;
current_block_offset + = cache_key . size ;
+ + keys_size ;
return true ;
}
2021-03-18 09:55:17 +00:00
inline bool writeKey ( const SSDCacheComplexKey & cache_key , size_t & offset_in_block )
2021-02-16 21:33:02 +00:00
{
assert ( cache_key . size > 0 ) ;
if ( ! enoughtPlaceToWriteKey ( cache_key ) )
return false ;
char * current_block_offset_data = block_data + current_block_offset ;
const StringRef & key = cache_key . key ;
/// Write complex key
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( & key . size ) , sizeof ( key . size ) ) ;
current_block_offset_data + = sizeof ( key . size ) ;
current_block_offset + = sizeof ( key . size ) ;
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( key . data ) , key . size ) ;
current_block_offset_data + = key . size ;
current_block_offset + = key . size ;
/// Write serialized columns size
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( & cache_key . size ) , sizeof ( cache_key . size ) ) ;
current_block_offset_data + = sizeof ( cache_key . size ) ;
current_block_offset + = sizeof ( cache_key . size ) ;
offset_in_block = current_block_offset ;
memcpy ( reinterpret_cast < void * > ( current_block_offset_data ) , reinterpret_cast < const void * > ( cache_key . data ) , cache_key . size ) ;
current_block_offset + = cache_key . size ;
+ + keys_size ;
return true ;
}
2021-03-18 09:55:17 +00:00
inline size_t getKeysSize ( ) const { return keys_size ; }
2021-02-16 21:33:02 +00:00
/// Write keys size into block header
2021-03-18 09:55:17 +00:00
inline void writeKeysSize ( )
2021-02-16 21:33:02 +00:00
{
char * keys_size_offset_data = block_data + block_header_check_sum_size ;
std : : memcpy ( keys_size_offset_data , & keys_size , sizeof ( size_t ) ) ;
}
/// Get check sum from block header
2021-03-18 09:55:17 +00:00
inline size_t getCheckSum ( ) const { return unalignedLoad < size_t > ( block_data ) ; }
2021-02-16 21:33:02 +00:00
/// Calculate check sum in block
2021-03-18 09:55:17 +00:00
inline size_t calculateCheckSum ( ) const
2021-02-16 21:33:02 +00:00
{
size_t calculated_check_sum = static_cast < size_t > ( CityHash_v1_0_2 : : CityHash64 ( block_data + block_header_check_sum_size , block_size - block_header_check_sum_size ) ) ;
return calculated_check_sum ;
}
/// Check if check sum from block header matched calculated check sum in block
2021-03-18 09:55:17 +00:00
inline bool checkCheckSum ( ) const
2021-02-16 21:33:02 +00:00
{
size_t calculated_check_sum = calculateCheckSum ( ) ;
size_t check_sum = getCheckSum ( ) ;
return calculated_check_sum = = check_sum ;
}
/// Write check sum in block header
2021-03-18 09:55:17 +00:00
inline void writeCheckSum ( )
2021-02-16 21:33:02 +00:00
{
size_t check_sum = static_cast < size_t > ( CityHash_v1_0_2 : : CityHash64 ( block_data + block_header_check_sum_size , block_size - block_header_check_sum_size ) ) ;
std : : memcpy ( block_data , & check_sum , sizeof ( size_t ) ) ;
}
2021-03-18 09:55:17 +00:00
inline size_t getBlockSize ( ) const { return block_size ; }
2021-02-16 21:33:02 +00:00
/// Returns block data
2021-03-18 09:55:17 +00:00
inline char * getBlockData ( ) const { return block_data ; }
2021-02-16 21:33:02 +00:00
/// Read keys that were serialized in block
/// It is client responsibility to ensure that simple or complex keys were written in block
void readSimpleKeys ( PaddedPODArray < UInt64 > & simple_keys ) const
{
char * block_start = block_data + block_header_size ;
char * block_end = block_data + block_size ;
static constexpr size_t key_prefix_size = sizeof ( UInt64 ) + sizeof ( size_t ) ;
while ( block_start + key_prefix_size < block_end )
{
UInt64 key = unalignedLoad < UInt64 > ( block_start ) ;
block_start + = sizeof ( UInt64 ) ;
size_t allocated_size = unalignedLoad < size_t > ( block_start ) ;
block_start + = sizeof ( size_t ) ;
/// If we read empty allocated size that means it is end of block
if ( allocated_size = = 0 )
break ;
simple_keys . emplace_back ( key ) ;
block_start + = allocated_size ;
}
}
void readComplexKeys ( PaddedPODArray < StringRef > & complex_keys ) const
{
char * block_start = block_data + block_header_size ;
char * block_end = block_data + block_size ;
static constexpr size_t key_prefix_size = sizeof ( size_t ) + sizeof ( size_t ) ;
while ( block_start + key_prefix_size < block_end )
{
size_t key_size = unalignedLoad < size_t > ( block_start ) ;
block_start + = sizeof ( key_size ) ;
StringRef complex_key ( block_start , key_size ) ;
block_start + = key_size ;
size_t allocated_size = unalignedLoad < size_t > ( block_start ) ;
block_start + = sizeof ( size_t ) ;
/// If we read empty allocated size that means it is end of block
if ( allocated_size = = 0 )
break ;
complex_keys . emplace_back ( complex_key ) ;
block_start + = allocated_size ;
}
}
private :
size_t block_size ;
2021-02-24 10:12:44 +00:00
char * block_data = nullptr ;
2021-02-16 21:33:02 +00:00
size_t current_block_offset = block_header_size ;
size_t keys_size = 0 ;
} ;
struct SSDCacheIndex
{
SSDCacheIndex ( size_t block_index_ , size_t offset_in_block_ )
: block_index ( block_index_ )
, offset_in_block ( offset_in_block_ )
{ }
SSDCacheIndex ( ) = default ;
size_t block_index = 0 ;
size_t offset_in_block = 0 ;
} ;
inline bool operator = = ( const SSDCacheIndex & lhs , const SSDCacheIndex & rhs )
{
return lhs . block_index = = rhs . block_index & & lhs . offset_in_block = = rhs . offset_in_block ;
}
2021-03-23 12:14:37 +00:00
/** Logically represents multiple memory_buffer_blocks_size SSDCacheBlocks and current write block.
2021-03-04 14:34:39 +00:00
* If key cannot be written into current_write_block , current block keys size and check summ is written
* and buffer increase index of current_write_block_index .
* If current_write_block_index = = memory_buffer_blocks_size write key will always returns true .
* If reset is called current_write_block_index is set to 0.
*/
2021-02-16 21:33:02 +00:00
template < typename SSDCacheKeyType >
2021-02-27 16:04:32 +00:00
class SSDCacheMemoryBuffer
2021-02-16 21:33:02 +00:00
{
public :
using KeyType = typename SSDCacheKeyType : : KeyType ;
2021-02-27 16:04:32 +00:00
explicit SSDCacheMemoryBuffer ( size_t block_size_ , size_t memory_buffer_blocks_size_ )
2021-02-16 21:33:02 +00:00
: block_size ( block_size_ )
2021-02-27 16:04:32 +00:00
, partition_blocks_size ( memory_buffer_blocks_size_ )
2021-02-25 14:39:05 +00:00
, buffer ( block_size * partition_blocks_size , 4096 )
2021-02-16 21:33:02 +00:00
, current_write_block ( block_size )
{
current_write_block . reset ( buffer . m_data ) ;
}
bool writeKey ( const SSDCacheKeyType & key , SSDCacheIndex & index )
{
if ( current_block_index = = partition_blocks_size )
return false ;
2021-02-19 13:32:26 +00:00
size_t block_offset = 0 ;
2021-02-16 21:33:02 +00:00
bool write_in_current_block = current_write_block . writeKey ( key , block_offset ) ;
if ( write_in_current_block )
{
index . block_index = current_block_index ;
index . offset_in_block = block_offset ;
return true ;
}
current_write_block . writeKeysSize ( ) ;
current_write_block . writeCheckSum ( ) ;
+ + current_block_index ;
if ( current_block_index = = partition_blocks_size )
return false ;
current_write_block . reset ( buffer . m_data + ( block_size * current_block_index ) ) ;
write_in_current_block = current_write_block . writeKey ( key , block_offset ) ;
assert ( write_in_current_block ) ;
index . block_index = current_block_index ;
index . offset_in_block = block_offset ;
return write_in_current_block ;
}
void writeKeysSizeAndCheckSumForCurrentWriteBlock ( )
{
current_write_block . writeKeysSize ( ) ;
current_write_block . writeCheckSum ( ) ;
}
inline char * getPlace ( SSDCacheIndex index ) const
{
return buffer . m_data + index . block_index * block_size + index . offset_in_block ;
}
inline size_t getCurrentBlockIndex ( ) const { return current_block_index ; }
inline const char * getData ( ) const { return buffer . m_data ; }
inline size_t getSizeInBytes ( ) const { return block_size * partition_blocks_size ; }
void readKeys ( PaddedPODArray < KeyType > & keys ) const
{
SSDCacheBlock block ( block_size ) ;
for ( size_t block_index = 0 ; block_index < partition_blocks_size ; + + block_index )
{
block . reset ( buffer . m_data + ( block_index * block_size ) ) ;
if constexpr ( std : : is_same_v < KeyType , UInt64 > )
block . readSimpleKeys ( keys ) ;
else
block . readComplexKeys ( keys ) ;
}
}
2021-02-27 16:04:32 +00:00
inline void reset ( )
2021-02-16 21:33:02 +00:00
{
current_block_index = 0 ;
current_write_block . reset ( buffer . m_data ) ;
}
const size_t block_size ;
const size_t partition_blocks_size ;
private :
Memory < Allocator < true > > buffer ;
SSDCacheBlock current_write_block ;
size_t current_block_index = 0 ;
} ;
2021-03-23 12:14:37 +00:00
/// Logically represents multiple memory_buffer_blocks_size SSDCacheBlocks on file system
2021-02-16 21:33:02 +00:00
template < typename SSDCacheKeyType >
class SSDCacheFileBuffer : private boost : : noncopyable
{
static constexpr auto BIN_FILE_EXT = " .bin " ;
public :
using KeyType = typename SSDCacheKeyType : : KeyType ;
explicit SSDCacheFileBuffer (
const std : : string & file_path_ ,
size_t block_size_ ,
2021-02-27 16:04:32 +00:00
size_t file_blocks_size_ )
2021-02-16 21:33:02 +00:00
: file_path ( file_path_ + BIN_FILE_EXT )
, block_size ( block_size_ )
, file_blocks_size ( file_blocks_size_ )
{
2021-02-17 19:12:00 +00:00
auto path = std : : filesystem : : path { file_path } ;
auto parent_path_directory = path . parent_path ( ) ;
/// If cache file is in directory that does not exists create it
if ( ! std : : filesystem : : exists ( parent_path_directory ) )
if ( ! std : : filesystem : : create_directories ( parent_path_directory ) )
throw Exception { " Failed to create directories. " , ErrorCodes : : CANNOT_CREATE_DIRECTORY } ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : FileOpen ) ;
file . fd = : : open ( file_path . c_str ( ) , O_RDWR | O_CREAT | O_TRUNC | O_DIRECT , 0666 ) ;
if ( file . fd = = - 1 )
{
auto error_code = ( errno = = ENOENT ) ? ErrorCodes : : FILE_DOESNT_EXIST : ErrorCodes : : CANNOT_OPEN_FILE ;
throwFromErrnoWithPath ( " Cannot open file " + file_path , file_path , error_code ) ;
}
allocateSizeForNextPartition ( ) ;
}
void allocateSizeForNextPartition ( )
{
if ( preallocateDiskSpace ( file . fd , current_blocks_size * block_size , block_size * file_blocks_size ) < 0 )
throwFromErrnoWithPath ( " Cannot preallocate space for the file " + file_path , file_path , ErrorCodes : : CANNOT_ALLOCATE_MEMORY ) ;
current_blocks_size + = file_blocks_size ;
}
bool writeBuffer ( const char * buffer , size_t buffer_size_in_blocks )
{
if ( current_block_index + buffer_size_in_blocks > current_blocks_size )
return false ;
AIOContext aio_context { 1 } ;
iocb write_request { } ;
iocb * write_request_ptr { & write_request } ;
# if defined(__FreeBSD__)
write_request . aio . aio_lio_opcode = LIO_WRITE ;
2021-02-21 12:50:55 +00:00
write_request . aio . aio_fildes = file . fd ;
2021-02-25 14:39:05 +00:00
write_request . aio . aio_buf = reinterpret_cast < volatile void * > ( const_cast < char * > ( buffer ) ) ;
2021-02-16 21:33:02 +00:00
write_request . aio . aio_nbytes = block_size * buffer_size_in_blocks ;
write_request . aio . aio_offset = current_block_index * block_size ;
# else
write_request . aio_lio_opcode = IOCB_CMD_PWRITE ;
write_request . aio_fildes = file . fd ;
write_request . aio_buf = reinterpret_cast < UInt64 > ( buffer ) ;
write_request . aio_nbytes = block_size * buffer_size_in_blocks ;
write_request . aio_offset = current_block_index * block_size ;
# endif
while ( io_submit ( aio_context . ctx , 1 , & write_request_ptr ) < 0 )
{
if ( errno ! = EINTR )
throw Exception ( " Cannot submit request for asynchronous IO on file " + file_path , ErrorCodes : : CANNOT_IO_SUBMIT ) ;
}
// CurrentMetrics::Increment metric_increment_write{CurrentMetrics::Write};
io_event event ;
while ( io_getevents ( aio_context . ctx , 1 , 1 , & event , nullptr ) < 0 )
{
if ( errno ! = EINTR )
throw Exception ( " Failed to wait for asynchronous IO completion on file " + file_path , ErrorCodes : : CANNOT_IO_GETEVENTS ) ;
}
// Unpoison the memory returned from an uninstrumented system function.
__msan_unpoison ( & event , sizeof ( event ) ) ;
auto bytes_written = eventResult ( event ) ;
ProfileEvents : : increment ( ProfileEvents : : WriteBufferAIOWrite ) ;
ProfileEvents : : increment ( ProfileEvents : : WriteBufferAIOWriteBytes , bytes_written ) ;
if ( bytes_written ! = static_cast < decltype ( bytes_written ) > ( block_size * buffer_size_in_blocks ) )
2021-02-21 12:50:55 +00:00
throw Exception ( " Not all data was written for asynchronous IO on file " + file_path + " . returned: " + std : : to_string ( bytes_written ) , ErrorCodes : : AIO_WRITE_ERROR ) ;
2021-02-16 21:33:02 +00:00
if ( : : fsync ( file . fd ) < 0 )
throwFromErrnoWithPath ( " Cannot fsync " + file_path , file_path , ErrorCodes : : CANNOT_FSYNC ) ;
current_block_index + = buffer_size_in_blocks ;
return true ;
}
bool readKeys ( size_t block_start , size_t blocks_length , PaddedPODArray < KeyType > & out ) const
{
if ( block_start + blocks_length > current_blocks_size )
return false ;
size_t buffer_size_in_bytes = blocks_length * block_size ;
Memory read_buffer_memory ( block_size * blocks_length , block_size ) ;
iocb request { } ;
iocb * request_ptr = & request ;
# if defined(__FreeBSD__)
request . aio . aio_lio_opcode = LIO_READ ;
2021-02-21 12:50:55 +00:00
request . aio . aio_fildes = file . fd ;
2021-02-16 21:33:02 +00:00
request . aio . aio_buf = reinterpret_cast < volatile void * > ( reinterpret_cast < UInt64 > ( read_buffer_memory . data ( ) ) ) ;
request . aio . aio_nbytes = buffer_size_in_bytes ;
request . aio . aio_offset = block_start * block_size ;
request . aio_data = 0 ;
# else
request . aio_lio_opcode = IOCB_CMD_PREAD ;
request . aio_fildes = file . fd ;
request . aio_buf = reinterpret_cast < UInt64 > ( read_buffer_memory . data ( ) ) ;
request . aio_nbytes = buffer_size_in_bytes ;
request . aio_offset = block_start * block_size ;
request . aio_data = 0 ;
# endif
io_event event { } ;
AIOContext aio_context ( 1 ) ;
while ( io_submit ( aio_context . ctx , 1 , & request_ptr ) ! = 1 )
{
if ( errno ! = EINTR )
throwFromErrno ( " io_submit: Failed to submit a request for asynchronous IO " , ErrorCodes : : CANNOT_IO_SUBMIT ) ;
}
while ( io_getevents ( aio_context . ctx , 1 , 1 , & event , nullptr ) ! = 1 )
{
if ( errno ! = EINTR )
throwFromErrno ( " io_getevents: Failed to get an event for asynchronous IO " , ErrorCodes : : CANNOT_IO_GETEVENTS ) ;
}
auto read_bytes = eventResult ( event ) ;
if ( read_bytes ! = static_cast < ssize_t > ( buffer_size_in_bytes ) )
2021-02-25 14:39:05 +00:00
throw Exception ( ErrorCodes : : AIO_READ_ERROR ,
" GC: AIO failed to read file ({}). Expected bytes ({}). Actual bytes ({}) " , file_path , buffer_size_in_bytes , read_bytes ) ;
2021-02-16 21:33:02 +00:00
SSDCacheBlock block ( block_size ) ;
for ( size_t i = 0 ; i < blocks_length ; + + i )
{
block . reset ( read_buffer_memory . data ( ) + ( i * block_size ) ) ;
if constexpr ( std : : is_same_v < SSDCacheKeyType , SSDCacheSimpleKey > )
block . readSimpleKeys ( out ) ;
else
block . readComplexKeys ( out ) ;
}
return true ;
}
template < typename FetchBlockFunc >
2021-03-17 19:01:45 +00:00
void fetchBlocks ( size_t read_from_file_buffer_blocks_size , const PaddedPODArray < size_t > & blocks_to_fetch , FetchBlockFunc & & func ) const
2021-02-16 21:33:02 +00:00
{
if ( blocks_to_fetch . empty ( ) )
return ;
2021-03-17 19:01:45 +00:00
Memory < Allocator < true > > read_buffer ( read_from_file_buffer_blocks_size * block_size , 4096 ) ;
2021-02-27 16:04:32 +00:00
size_t blocks_to_fetch_size = blocks_to_fetch . size ( ) ;
2021-02-16 21:33:02 +00:00
PaddedPODArray < iocb > requests ;
PaddedPODArray < iocb * > pointers ;
2021-02-27 16:04:32 +00:00
requests . reserve ( blocks_to_fetch_size ) ;
pointers . reserve ( blocks_to_fetch_size ) ;
2021-02-16 21:33:02 +00:00
2021-02-27 16:04:32 +00:00
for ( size_t block_to_fetch_index = 0 ; block_to_fetch_index < blocks_to_fetch_size ; + + block_to_fetch_index )
2021-02-16 21:33:02 +00:00
{
iocb request { } ;
2021-03-17 19:01:45 +00:00
char * buffer_place = read_buffer . data ( ) + block_size * ( block_to_fetch_index % read_from_file_buffer_blocks_size ) ;
2021-02-16 21:33:02 +00:00
# if defined(__FreeBSD__)
request . aio . aio_lio_opcode = LIO_READ ;
request . aio . aio_fildes = file . fd ;
request . aio . aio_buf = reinterpret_cast < volatile void * > ( reinterpret_cast < UInt64 > ( buffer_place ) ) ;
request . aio . aio_nbytes = block_size ;
request . aio . aio_offset = block_size * blocks_to_fetch [ block_to_fetch_index ] ;
request . aio_data = block_to_fetch_index ;
# else
request . aio_lio_opcode = IOCB_CMD_PREAD ;
request . aio_fildes = file . fd ;
request . aio_buf = reinterpret_cast < UInt64 > ( buffer_place ) ;
request . aio_nbytes = block_size ;
request . aio_offset = block_size * blocks_to_fetch [ block_to_fetch_index ] ;
request . aio_data = block_to_fetch_index ;
# endif
requests . push_back ( request ) ;
pointers . push_back ( & requests . back ( ) ) ;
}
AIOContext aio_context ( read_from_file_buffer_blocks_size ) ;
PaddedPODArray < bool > processed ( requests . size ( ) , false ) ;
2021-03-01 22:23:14 +00:00
PaddedPODArray < io_event > events ;
events . resize_fill ( requests . size ( ) ) ;
2021-02-16 21:33:02 +00:00
size_t to_push = 0 ;
size_t to_pop = 0 ;
while ( to_pop < requests . size ( ) )
{
int popped = 0 ;
while ( to_pop < to_push & & ( popped = io_getevents ( aio_context . ctx , to_push - to_pop , to_push - to_pop , & events [ to_pop ] , nullptr ) ) < = 0 )
{
if ( errno ! = EINTR )
throwFromErrno ( " io_getevents: Failed to get an event for asynchronous IO " , ErrorCodes : : CANNOT_IO_GETEVENTS ) ;
}
for ( size_t i = to_pop ; i < to_pop + popped ; + + i )
{
size_t block_to_fetch_index = events [ i ] . data ;
const auto & request = requests [ block_to_fetch_index ] ;
const ssize_t read_bytes = eventResult ( events [ i ] ) ;
if ( read_bytes ! = static_cast < ssize_t > ( block_size ) )
2021-02-25 14:39:05 +00:00
throw Exception ( ErrorCodes : : AIO_READ_ERROR ,
" GC: AIO failed to read file ({}). Expected bytes ({}). Actual bytes ({}) " , file_path , block_size , read_bytes ) ;
2021-02-16 21:33:02 +00:00
char * request_buffer = getRequestBuffer ( request ) ;
2021-02-19 10:30:53 +00:00
// Unpoison the memory returned from an uninstrumented system function.
__msan_unpoison ( request_buffer , block_size ) ;
2021-02-16 21:33:02 +00:00
SSDCacheBlock block ( block_size ) ;
block . reset ( request_buffer ) ;
if ( ! block . checkCheckSum ( ) )
{
std : : string calculated_check_sum = std : : to_string ( block . calculateCheckSum ( ) ) ;
std : : string check_sum = std : : to_string ( block . getCheckSum ( ) ) ;
throw Exception ( " Cache data corrupted. Checksum validation failed. Calculated " + calculated_check_sum + " in block " + check_sum , ErrorCodes : : CORRUPTED_DATA ) ;
}
std : : forward < FetchBlockFunc > ( func ) ( blocks_to_fetch [ block_to_fetch_index ] , block . getBlockData ( ) ) ;
processed [ block_to_fetch_index ] = true ;
}
while ( to_pop < requests . size ( ) & & processed [ to_pop ] )
+ + to_pop ;
/// add new io tasks
const int new_tasks_count = std : : min ( read_from_file_buffer_blocks_size - ( to_push - to_pop ) , requests . size ( ) - to_push ) ;
int pushed = 0 ;
while ( new_tasks_count > 0 & & ( pushed = io_submit ( aio_context . ctx , new_tasks_count , & pointers [ to_push ] ) ) < = 0 )
{
if ( errno ! = EINTR )
throwFromErrno ( " io_submit: Failed to submit a request for asynchronous IO " , ErrorCodes : : CANNOT_IO_SUBMIT ) ;
}
to_push + = pushed ;
}
}
inline size_t getCurrentBlockIndex ( ) const { return current_block_index ; }
inline void reset ( )
{
current_block_index = 0 ;
}
private :
struct FileDescriptor : private boost : : noncopyable
{
FileDescriptor ( ) = default ;
FileDescriptor ( FileDescriptor & & rhs ) : fd ( rhs . fd ) { rhs . fd = - 1 ; }
FileDescriptor & operator = ( FileDescriptor & & rhs )
{
close ( fd ) ;
fd = rhs . fd ;
rhs . fd = - 1 ;
}
~ FileDescriptor ( )
{
if ( fd ! = - 1 )
close ( fd ) ;
}
int fd = - 1 ;
} ;
2021-03-18 09:55:17 +00:00
inline static int preallocateDiskSpace ( int fd , size_t offset , size_t len )
2021-02-16 21:33:02 +00:00
{
# if defined(__FreeBSD__)
return posix_fallocate ( fd , offset , len ) ;
# else
return fallocate ( fd , 0 , offset , len ) ;
# endif
}
2021-03-18 09:55:17 +00:00
inline static char * getRequestBuffer ( const iocb & request )
2021-02-16 21:33:02 +00:00
{
char * result = nullptr ;
2021-02-25 14:39:05 +00:00
2021-02-16 21:33:02 +00:00
# if defined(__FreeBSD__)
2021-02-25 14:39:05 +00:00
result = reinterpret_cast < char * > ( reinterpret_cast < UInt64 > ( request . aio . aio_buf ) ) ;
2021-02-16 21:33:02 +00:00
# else
2021-02-25 14:39:05 +00:00
result = reinterpret_cast < char * > ( request . aio_buf ) ;
2021-02-16 21:33:02 +00:00
# endif
return result ;
}
2021-03-18 09:55:17 +00:00
inline static ssize_t eventResult ( io_event & event )
2021-02-16 21:33:02 +00:00
{
ssize_t bytes_written ;
# if defined(__FreeBSD__)
bytes_written = aio_return ( reinterpret_cast < struct aiocb * > ( event . udata ) ) ;
# else
bytes_written = event . res ;
# endif
return bytes_written ;
}
2021-02-25 14:39:05 +00:00
String file_path ;
2021-02-16 21:33:02 +00:00
size_t block_size ;
size_t file_blocks_size ;
FileDescriptor file ;
size_t current_block_index = 0 ;
size_t current_blocks_size = 0 ;
} ;
2021-03-23 12:14:37 +00:00
/** ICacheDictionaryStorage implementation that keeps column data serialized in memory index and in disk partitions.
2021-03-23 15:14:22 +00:00
* Data is first written in memory buffer .
2021-03-23 12:14:37 +00:00
* If memory buffer is full then buffer is flushed to disk partition .
* If memory buffer cannot be flushed to associated disk partition , then if partition
* can be allocated ( current partition index < max_partitions_size ) storage allocates new partition , if not old partitions are reused .
* Index maps key to partition block and offset .
*/
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
class SSDCacheDictionaryStorage final : public ICacheDictionaryStorage
{
public :
using SSDCacheKeyType = std : : conditional_t < dictionary_key_type = = DictionaryKeyType : : simple , SSDCacheSimpleKey , SSDCacheComplexKey > ;
using KeyType = std : : conditional_t < dictionary_key_type = = DictionaryKeyType : : simple , UInt64 , StringRef > ;
explicit SSDCacheDictionaryStorage ( const SSDCacheDictionaryStorageConfiguration & configuration_ )
: configuration ( configuration_ )
2021-02-27 16:04:32 +00:00
, file_buffer ( configuration_ . file_path , configuration . block_size , configuration . file_blocks_size )
2021-02-16 21:33:02 +00:00
, rnd_engine ( randomSeed ( ) )
{
memory_buffer_partitions . emplace_back ( configuration . block_size , configuration . write_buffer_blocks_size ) ;
}
2021-02-17 18:19:04 +00:00
bool returnsFetchedColumnsInOrderOfRequestedKeys ( ) const override { return false ; }
2021-02-17 11:48:06 +00:00
2021-02-26 15:56:41 +00:00
String getName ( ) const override
{
if ( dictionary_key_type = = DictionaryKeyType : : simple )
return " SSDCache " ;
else
return " SSDComplexKeyCache " ;
2021-02-26 18:05:06 +00:00
}
2021-02-26 15:56:41 +00:00
2021-02-17 18:19:04 +00:00
bool supportsSimpleKeys ( ) const override { return dictionary_key_type = = DictionaryKeyType : : simple ; }
2021-02-16 21:33:02 +00:00
SimpleKeysStorageFetchResult fetchColumnsForKeys (
const PaddedPODArray < UInt64 > & keys ,
2021-02-17 11:48:06 +00:00
const DictionaryStorageFetchRequest & fetch_request ) override
2021-02-16 21:33:02 +00:00
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
2021-02-27 16:04:32 +00:00
return fetchColumnsForKeysImpl < SimpleKeysStorageFetchResult > ( keys , fetch_request ) ;
2021-02-16 21:33:02 +00:00
else
throw Exception ( " Method insertColumnsForKeys is not supported for complex key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
void insertColumnsForKeys ( const PaddedPODArray < UInt64 > & keys , Columns columns ) override
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
insertColumnsForKeysImpl ( keys , columns ) ;
else
throw Exception ( " Method insertColumnsForKeys is not supported for complex key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-03-03 18:58:43 +00:00
void insertDefaultKeys ( const PaddedPODArray < UInt64 > & keys ) override
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
insertDefaultKeysImpl ( keys ) ;
else
throw Exception ( " Method insertDefaultKeysImpl is not supported for complex key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-02-16 21:33:02 +00:00
PaddedPODArray < UInt64 > getCachedSimpleKeys ( ) const override
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
return getCachedKeysImpl ( ) ;
else
throw Exception ( " Method getCachedSimpleKeys is not supported for complex key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-02-17 18:19:04 +00:00
bool supportsComplexKeys ( ) const override { return dictionary_key_type = = DictionaryKeyType : : complex ; }
2021-02-16 21:33:02 +00:00
ComplexKeysStorageFetchResult fetchColumnsForKeys (
const PaddedPODArray < StringRef > & keys ,
2021-02-27 16:04:32 +00:00
const DictionaryStorageFetchRequest & fetch_request ) override
2021-02-16 21:33:02 +00:00
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
2021-02-27 16:04:32 +00:00
return fetchColumnsForKeysImpl < ComplexKeysStorageFetchResult > ( keys , fetch_request ) ;
2021-02-16 21:33:02 +00:00
else
throw Exception ( " Method fetchColumnsForKeys is not supported for simple key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
void insertColumnsForKeys ( const PaddedPODArray < StringRef > & keys , Columns columns ) override
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
insertColumnsForKeysImpl ( keys , columns ) ;
else
throw Exception ( " Method insertColumnsForKeys is not supported for simple key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-03-05 14:12:50 +00:00
void insertDefaultKeys ( const PaddedPODArray < StringRef > & keys ) override
2021-03-03 18:58:43 +00:00
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
insertDefaultKeysImpl ( keys ) ;
else
throw Exception ( " Method insertDefaultKeysImpl is not supported for simple key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-02-16 21:33:02 +00:00
PaddedPODArray < StringRef > getCachedComplexKeys ( ) const override
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
return getCachedKeysImpl ( ) ;
else
throw Exception ( " Method getCachedSimpleKeys is not supported for simple key storage " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
size_t getSize ( ) const override { return index . size ( ) ; }
2021-03-18 09:55:17 +00:00
double getLoadFactor ( ) const override
{
size_t partitions_size = memory_buffer_partitions . size ( ) ;
if ( partitions_size = = configuration . max_partitions_count )
return 1.0 ;
auto & current_memory_partition = memory_buffer_partitions [ current_partition_index ] ;
size_t full_partitions = partitions_size - 1 ;
size_t blocks_in_memory = ( full_partitions * configuration . write_buffer_blocks_size ) + current_memory_partition . getCurrentBlockIndex ( ) ;
size_t blocks_on_disk = file_buffer . getCurrentBlockIndex ( ) ;
size_t max_blocks_size = ( configuration . file_blocks_size + configuration . write_buffer_blocks_size ) * configuration . max_partitions_count ;
double load_factor = static_cast < double > ( blocks_in_memory + blocks_on_disk ) / max_blocks_size ;
return load_factor ;
}
2021-02-17 18:19:04 +00:00
2021-02-16 21:33:02 +00:00
size_t getBytesAllocated ( ) const override
{
size_t memory_partitions_bytes_size = memory_buffer_partitions . size ( ) * configuration . write_buffer_blocks_size * configuration . block_size ;
size_t file_partitions_bytes_size = memory_buffer_partitions . size ( ) * configuration . file_blocks_size * configuration . block_size ;
2021-03-18 09:55:17 +00:00
return index . getBufferSizeInBytes ( ) + memory_partitions_bytes_size + file_partitions_bytes_size ;
2021-02-16 21:33:02 +00:00
}
private :
using TimePoint = std : : chrono : : system_clock : : time_point ;
struct Cell
{
2021-03-03 18:58:43 +00:00
enum CellState
{
in_memory ,
on_disk ,
default_value
} ;
2021-03-17 19:01:45 +00:00
time_t deadline ;
2021-02-16 21:33:02 +00:00
SSDCacheIndex index ;
size_t in_memory_partition_index ;
2021-03-03 18:58:43 +00:00
CellState state ;
inline bool isInMemory ( ) const { return state = = in_memory ; }
inline bool isOnDisk ( ) const { return state = = on_disk ; }
inline bool isDefaultValue ( ) const { return state = = default_value ; }
2021-02-16 21:33:02 +00:00
} ;
2021-02-25 14:39:05 +00:00
struct KeyToBlockOffset
{
2021-03-18 09:55:17 +00:00
KeyToBlockOffset ( size_t key_index_ , size_t offset_in_block_ )
: key_index ( key_index_ ) , offset_in_block ( offset_in_block_ )
2021-02-25 14:39:05 +00:00
{ }
2021-03-08 22:03:25 +00:00
size_t key_index = 0 ;
size_t offset_in_block = 0 ;
2021-02-25 14:39:05 +00:00
} ;
2021-02-16 21:33:02 +00:00
template < typename Result >
2021-02-27 16:04:32 +00:00
Result fetchColumnsForKeysImpl (
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyType > & keys ,
2021-02-27 16:04:32 +00:00
const DictionaryStorageFetchRequest & fetch_request ) const
2021-02-16 21:33:02 +00:00
{
2021-02-27 16:04:32 +00:00
Result result ;
2021-02-16 21:33:02 +00:00
result . fetched_columns = fetch_request . makeAttributesResultColumns ( ) ;
2021-03-18 09:55:17 +00:00
result . key_index_to_state . resize_fill ( keys . size ( ) ) ;
2021-02-16 21:33:02 +00:00
2021-03-17 19:01:45 +00:00
const time_t now = std : : chrono : : system_clock : : to_time_t ( std : : chrono : : system_clock : : now ( ) ) ;
2021-02-16 21:33:02 +00:00
size_t fetched_columns_index = 0 ;
2021-03-17 19:01:45 +00:00
using BlockIndexToKeysMap = absl : : flat_hash_map < size_t , PaddedPODArray < KeyToBlockOffset > , DefaultHash < size_t > > ;
2021-02-16 21:33:02 +00:00
BlockIndexToKeysMap block_to_keys_map ;
2021-02-27 16:04:32 +00:00
absl : : flat_hash_set < size_t , DefaultHash < size_t > > unique_blocks_to_request ;
PaddedPODArray < size_t > blocks_to_request ;
2021-02-16 21:33:02 +00:00
2021-03-17 19:01:45 +00:00
time_t strict_max_lifetime_seconds = static_cast < time_t > ( configuration . strict_max_lifetime_seconds ) ;
2021-02-27 16:04:32 +00:00
size_t keys_size = keys . size ( ) ;
2021-03-17 19:01:45 +00:00
for ( size_t attribute_size = 0 ; attribute_size < fetch_request . attributesSize ( ) ; + + attribute_size )
if ( fetch_request . shouldFillResultColumnWithIndex ( attribute_size ) )
result . fetched_columns [ attribute_size ] - > reserve ( keys_size ) ;
2021-02-27 16:04:32 +00:00
for ( size_t key_index = 0 ; key_index < keys_size ; + + key_index )
2021-02-16 21:33:02 +00:00
{
auto key = keys [ key_index ] ;
2021-03-18 09:55:17 +00:00
const auto * it = index . find ( key ) ;
2021-02-16 21:33:02 +00:00
2021-03-05 14:12:50 +00:00
if ( ! it )
2021-02-16 21:33:02 +00:00
{
2021-03-05 14:12:50 +00:00
+ + result . not_found_keys_size ;
continue ;
}
2021-02-16 21:33:02 +00:00
2021-03-05 14:12:50 +00:00
const auto & cell = it - > getMapped ( ) ;
2021-02-19 10:30:53 +00:00
2021-03-18 09:55:17 +00:00
if ( unlikely ( now > cell . deadline + strict_max_lifetime_seconds ) )
2021-03-05 14:12:50 +00:00
{
+ + result . not_found_keys_size ;
continue ;
}
2021-02-27 16:04:32 +00:00
2021-03-05 14:12:50 +00:00
bool cell_is_expired = false ;
KeyState : : State key_state = KeyState : : found ;
2021-02-26 15:56:41 +00:00
2021-03-17 19:01:45 +00:00
if ( now > cell . deadline )
2021-03-05 14:12:50 +00:00
{
cell_is_expired = true ;
key_state = KeyState : : expired ;
}
2021-03-18 09:55:17 +00:00
result . expired_keys_size + = static_cast < size_t > ( cell_is_expired ) ;
result . found_keys_size + = static_cast < size_t > ( ! cell_is_expired ) ;
2021-02-26 15:56:41 +00:00
2021-03-05 14:12:50 +00:00
switch ( cell . state )
{
case Cell : : in_memory :
2021-02-27 16:04:32 +00:00
{
2021-03-05 14:12:50 +00:00
result . key_index_to_state [ key_index ] = { key_state , fetched_columns_index } ;
+ + fetched_columns_index ;
2021-02-27 16:04:32 +00:00
2021-03-05 14:12:50 +00:00
const auto & partition = memory_buffer_partitions [ cell . in_memory_partition_index ] ;
char * serialized_columns_place = partition . getPlace ( cell . index ) ;
deserializeAndInsertIntoColumns ( result . fetched_columns , fetch_request , serialized_columns_place ) ;
break ;
}
case Cell : : on_disk :
{
2021-03-17 19:01:45 +00:00
PaddedPODArray < KeyToBlockOffset > & keys_block = block_to_keys_map [ cell . index . block_index ] ;
2021-03-18 09:55:17 +00:00
keys_block . emplace_back ( key_index , cell . index . offset_in_block ) ;
2021-02-26 15:56:41 +00:00
2021-03-18 09:55:17 +00:00
KeyState : : State state = cell_is_expired ? KeyState : : expired : KeyState : : found ;
/// Fetched column index will be set later during fetch blocks
result . key_index_to_state [ key_index ] = { state , 0 } ;
auto insert_result = unique_blocks_to_request . insert ( cell . index . block_index ) ;
bool was_inserted = insert_result . second ;
if ( was_inserted )
2021-03-05 14:12:50 +00:00
blocks_to_request . emplace_back ( cell . index . block_index ) ;
2021-03-18 09:55:17 +00:00
2021-03-05 14:12:50 +00:00
break ;
}
case Cell : : default_value :
{
result . key_index_to_state [ key_index ] = { key_state , fetched_columns_index } ;
result . key_index_to_state [ key_index ] . setDefault ( ) ;
+ + fetched_columns_index ;
+ + result . default_keys_size ;
insertDefaultValuesIntoColumns ( result . fetched_columns , fetch_request , key_index ) ;
break ;
2021-02-16 21:33:02 +00:00
}
}
}
/// Sort blocks by offset before start async io requests
std : : sort ( blocks_to_request . begin ( ) , blocks_to_request . end ( ) ) ;
2021-03-17 19:01:45 +00:00
file_buffer . fetchBlocks ( configuration . read_buffer_blocks_size , blocks_to_request , [ & ] ( size_t block_index , char * block_data )
2021-02-16 21:33:02 +00:00
{
2021-03-08 22:03:25 +00:00
auto & keys_in_block = block_to_keys_map [ block_index ] ;
2021-02-16 21:33:02 +00:00
for ( auto & key_in_block : keys_in_block )
{
char * key_data = block_data + key_in_block . offset_in_block ;
deserializeAndInsertIntoColumns ( result . fetched_columns , fetch_request , key_data ) ;
2021-03-18 09:55:17 +00:00
result . key_index_to_state [ key_in_block . key_index ] . setFetchedColumnIndex ( fetched_columns_index ) ;
2021-02-16 21:33:02 +00:00
+ + fetched_columns_index ;
}
} ) ;
2021-02-27 16:04:32 +00:00
return result ;
2021-02-16 21:33:02 +00:00
}
void insertColumnsForKeysImpl ( const PaddedPODArray < KeyType > & keys , Columns columns )
{
size_t columns_to_serialize_size = columns . size ( ) ;
PaddedPODArray < StringRef > temporary_column_data ( columns_to_serialize_size ) ;
Arena temporary_values_pool ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
{
size_t allocated_size_for_columns = 0 ;
const char * block_start = nullptr ;
auto key = keys [ key_index ] ;
for ( size_t column_index = 0 ; column_index < columns_to_serialize_size ; + + column_index )
{
auto & column = columns [ column_index ] ;
temporary_column_data [ column_index ] = column - > serializeValueIntoArena ( key_index , temporary_values_pool , block_start ) ;
allocated_size_for_columns + = temporary_column_data [ column_index ] . size ;
}
SSDCacheKeyType ssd_cache_key { key , allocated_size_for_columns , block_start } ;
if ( ! SSDCacheBlock : : canBeWrittenInEmptyBlock ( ssd_cache_key , configuration . block_size ) )
throw Exception ( " Serialized columns size is greater than allowed block size and metadata " , ErrorCodes : : UNSUPPORTED_METHOD ) ;
/// We cannot reuse place that is already allocated in file or memory cache so we erase key from index
2021-03-18 09:55:17 +00:00
eraseKeyFromIndex ( key ) ;
2021-02-16 21:33:02 +00:00
Cell cell ;
setCellDeadline ( cell , now ) ;
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
{
/// Copy complex key into arena and put in cache
size_t key_size = key . size ;
char * place_for_key = complex_key_arena . alloc ( key_size ) ;
memcpy ( reinterpret_cast < void * > ( place_for_key ) , reinterpret_cast < const void * > ( key . data ) , key_size ) ;
KeyType updated_key { place_for_key , key_size } ;
ssd_cache_key . key = updated_key ;
}
insertCell ( ssd_cache_key , cell ) ;
temporary_values_pool . rollback ( allocated_size_for_columns ) ;
}
}
2021-03-03 18:58:43 +00:00
void insertDefaultKeysImpl ( const PaddedPODArray < KeyType > & keys )
{
const auto now = std : : chrono : : system_clock : : now ( ) ;
for ( auto key : keys )
{
2021-03-18 09:55:17 +00:00
eraseKeyFromIndex ( key ) ;
2021-03-03 18:58:43 +00:00
Cell cell ;
setCellDeadline ( cell , now ) ;
2021-03-05 14:12:50 +00:00
cell . index = { 0 , 0 } ;
2021-03-03 18:58:43 +00:00
cell . in_memory_partition_index = 0 ;
cell . state = Cell : : default_value ;
2021-03-05 14:12:50 +00:00
2021-03-03 18:58:43 +00:00
if constexpr ( dictionary_key_type = = DictionaryKeyType : : complex )
{
/// Copy complex key into arena and put in cache
size_t key_size = key . size ;
char * place_for_key = complex_key_arena . alloc ( key_size ) ;
memcpy ( reinterpret_cast < void * > ( place_for_key ) , reinterpret_cast < const void * > ( key . data ) , key_size ) ;
KeyType updated_key { place_for_key , key_size } ;
key = updated_key ;
}
2021-03-18 09:55:17 +00:00
index [ key ] = cell ;
2021-03-03 18:58:43 +00:00
}
}
2021-02-16 21:33:02 +00:00
PaddedPODArray < KeyType > getCachedKeysImpl ( ) const
{
PaddedPODArray < KeyType > result ;
result . reserve ( index . size ( ) ) ;
for ( auto & node : index )
2021-03-03 18:58:43 +00:00
{
auto & cell = node . getMapped ( ) ;
if ( cell . state = = Cell : : default_value )
continue ;
2021-02-16 21:33:02 +00:00
result . emplace_back ( node . getKey ( ) ) ;
2021-03-03 18:58:43 +00:00
}
2021-02-16 21:33:02 +00:00
return result ;
}
void insertCell ( SSDCacheKeyType & ssd_cache_key , Cell & cell )
{
2021-03-05 14:12:50 +00:00
/** InsertCell has following flow
1. We try to write key into current memory buffer , if write succeeded then return .
2. Then if we does not write key into current memory buffer , we try to flush current memory buffer
to disk .
If flush succeeded then reset current memory buffer , write key into it and return .
If flush failed that means that current partition on disk is full , need to allocate new partition
or start reusing old ones .
Retry to step 1.
*/
2021-02-16 21:33:02 +00:00
2021-03-05 14:12:50 +00:00
SSDCacheIndex cache_index { 0 , 0 } ;
2021-02-16 21:33:02 +00:00
while ( true )
{
bool started_reusing_old_partitions = memory_buffer_partitions . size ( ) = = configuration . max_partitions_count ;
auto & current_memory_buffer_partition = memory_buffer_partitions [ current_partition_index ] ;
bool write_into_memory_buffer_result = current_memory_buffer_partition . writeKey ( ssd_cache_key , cache_index ) ;
if ( write_into_memory_buffer_result )
{
2021-03-03 18:58:43 +00:00
cell . state = Cell : : in_memory ;
2021-02-16 21:33:02 +00:00
cell . index = cache_index ;
cell . in_memory_partition_index = current_partition_index ;
2021-03-18 09:55:17 +00:00
index [ ssd_cache_key . key ] = cell ;
2021-02-16 21:33:02 +00:00
break ;
}
else
{
/// Partition memory write buffer if full flush it to disk and retry
size_t block_index_in_file_before_write = file_buffer . getCurrentBlockIndex ( ) ;
if ( started_reusing_old_partitions )
{
/// If we start reusing old partitions we need to remove old keys on disk from index before writing buffer
PaddedPODArray < KeyType > old_keys ;
file_buffer . readKeys ( block_index_in_file_before_write , configuration . write_buffer_blocks_size , old_keys ) ;
2021-02-25 14:39:05 +00:00
size_t file_read_end_block_index = block_index_in_file_before_write + configuration . write_buffer_blocks_size ;
2021-02-16 21:33:02 +00:00
for ( auto old_key : old_keys )
2021-02-25 14:39:05 +00:00
{
auto * it = index . find ( old_key ) ;
if ( it )
{
const Cell & old_key_cell = it - > getMapped ( ) ;
size_t old_key_block = old_key_cell . index . block_index ;
/// Check if key in index is key from old partition blocks
2021-03-03 18:58:43 +00:00
if ( old_key_cell . isOnDisk ( ) & &
2021-03-05 14:12:50 +00:00
old_key_block > = block_index_in_file_before_write & &
old_key_block < file_read_end_block_index )
2021-03-18 09:55:17 +00:00
eraseKeyFromIndex ( old_key ) ;
2021-02-25 14:39:05 +00:00
}
}
2021-02-16 21:33:02 +00:00
}
const char * partition_data = current_memory_buffer_partition . getData ( ) ;
bool flush_to_file_result = file_buffer . writeBuffer ( partition_data , configuration . write_buffer_blocks_size ) ;
if ( flush_to_file_result )
{
/// Update index cells keys offset and block index
PaddedPODArray < KeyType > keys_to_update ;
current_memory_buffer_partition . readKeys ( keys_to_update ) ;
2021-03-01 23:49:39 +00:00
absl : : flat_hash_set < KeyType , DefaultHash < KeyType > > updated_keys ;
2021-03-03 18:58:43 +00:00
Int64 keys_to_update_size = static_cast < Int64 > ( keys_to_update . size ( ) ) ;
/// Start from last to first because there can be multiple keys in same partition.
/// The valid key is the latest.
for ( Int64 i = keys_to_update_size - 1 ; i > = 0 ; - - i )
2021-02-16 21:33:02 +00:00
{
2021-03-03 18:58:43 +00:00
auto key_to_update = keys_to_update [ i ] ;
2021-02-16 21:33:02 +00:00
auto * it = index . find ( key_to_update ) ;
2021-03-06 11:36:11 +00:00
/// If there are no key to update or key to update not in memory
2021-03-05 14:12:50 +00:00
if ( ! it | | it - > getMapped ( ) . state ! = Cell : : in_memory )
continue ;
/// If there were duplicated keys in memory buffer partition
if ( updated_keys . contains ( it - > getKey ( ) ) )
2021-02-16 21:33:02 +00:00
continue ;
2021-03-01 23:49:39 +00:00
updated_keys . insert ( key_to_update ) ;
2021-02-16 21:33:02 +00:00
Cell & cell_to_update = it - > getMapped ( ) ;
2021-03-03 18:58:43 +00:00
cell_to_update . state = Cell : : on_disk ;
2021-02-25 14:39:05 +00:00
cell_to_update . index . block_index + = block_index_in_file_before_write ;
2021-02-16 21:33:02 +00:00
}
/// Memory buffer partition flushed to disk start reusing it
2021-02-27 16:04:32 +00:00
current_memory_buffer_partition . reset ( ) ;
2021-02-17 19:12:00 +00:00
memset ( const_cast < char * > ( current_memory_buffer_partition . getData ( ) ) , 0 , current_memory_buffer_partition . getSizeInBytes ( ) ) ;
2021-02-16 21:33:02 +00:00
write_into_memory_buffer_result = current_memory_buffer_partition . writeKey ( ssd_cache_key , cache_index ) ;
assert ( write_into_memory_buffer_result ) ;
2021-03-03 18:58:43 +00:00
cell . state = Cell : : in_memory ;
2021-02-16 21:33:02 +00:00
cell . index = cache_index ;
cell . in_memory_partition_index = current_partition_index ;
2021-03-18 09:55:17 +00:00
index [ ssd_cache_key . key ] = cell ;
2021-02-16 21:33:02 +00:00
break ;
}
else
{
/// Partition is full need to try next partition
if ( memory_buffer_partitions . size ( ) < configuration . max_partitions_count )
{
/// Try tro create next partition without reusing old partitions
+ + current_partition_index ;
file_buffer . allocateSizeForNextPartition ( ) ;
memory_buffer_partitions . emplace_back ( configuration . block_size , configuration . write_buffer_blocks_size ) ;
}
else
{
/// Start reusing old partitions
current_partition_index = ( current_partition_index + 1 ) % memory_buffer_partitions . size ( ) ;
file_buffer . reset ( ) ;
}
}
}
}
}
inline void setCellDeadline ( Cell & cell , TimePoint now )
{
2021-02-19 10:30:53 +00:00
if ( configuration . lifetime . min_sec = = 0 & & configuration . lifetime . max_sec = = 0 )
2021-03-03 18:58:43 +00:00
{
2021-03-17 19:01:45 +00:00
auto deadline = std : : chrono : : time_point < std : : chrono : : system_clock > : : max ( ) - 2 * std : : chrono : : seconds ( configuration . strict_max_lifetime_seconds ) ;
cell . deadline = std : : chrono : : system_clock : : to_time_t ( deadline ) ;
2021-03-03 18:58:43 +00:00
return ;
}
2021-02-16 21:33:02 +00:00
size_t min_sec_lifetime = configuration . lifetime . min_sec ;
size_t max_sec_lifetime = configuration . lifetime . max_sec ;
std : : uniform_int_distribution < UInt64 > distribution { min_sec_lifetime , max_sec_lifetime } ;
2021-03-18 09:55:17 +00:00
auto deadline = now + std : : chrono : : seconds ( distribution ( rnd_engine ) ) ;
2021-03-17 19:01:45 +00:00
cell . deadline = std : : chrono : : system_clock : : to_time_t ( deadline ) ;
2021-02-16 21:33:02 +00:00
}
2021-03-18 09:55:17 +00:00
inline void eraseKeyFromIndex ( KeyType key )
{
auto it = index . find ( key ) ;
if ( it = = nullptr )
return ;
2021-03-18 11:58:43 +00:00
/// In case of complex key in arena key is serialized from hash table
KeyType key_copy = it - > getKey ( ) ;
2021-03-18 09:55:17 +00:00
index . erase ( key ) ;
2021-03-18 11:58:43 +00:00
if constexpr ( std : : is_same_v < KeyType , StringRef > )
complex_key_arena . free ( const_cast < char * > ( key_copy . data ) , key_copy . size ) ;
2021-03-18 09:55:17 +00:00
}
2021-02-16 21:33:02 +00:00
SSDCacheDictionaryStorageConfiguration configuration ;
SSDCacheFileBuffer < SSDCacheKeyType > file_buffer ;
2021-02-27 16:04:32 +00:00
std : : vector < SSDCacheMemoryBuffer < SSDCacheKeyType > > memory_buffer_partitions ;
2021-02-16 21:33:02 +00:00
pcg64 rnd_engine ;
2021-03-18 09:55:17 +00:00
using SimpleKeyHashMap = HashMap < UInt64 , Cell > ;
using ComplexKeyHashMap = HashMapWithSavedHash < StringRef , Cell > ;
2021-02-16 21:33:02 +00:00
2021-03-18 09:55:17 +00:00
using CacheMap = std : : conditional_t <
2021-02-16 21:33:02 +00:00
dictionary_key_type = = DictionaryKeyType : : simple ,
2021-03-18 09:55:17 +00:00
SimpleKeyHashMap ,
ComplexKeyHashMap > ;
2021-02-16 21:33:02 +00:00
ArenaWithFreeLists complex_key_arena ;
2021-03-18 09:55:17 +00:00
CacheMap index ;
2021-02-17 19:02:28 +00:00
2021-02-16 21:33:02 +00:00
size_t current_partition_index = 0 ;
} ;
}
2021-02-21 12:50:55 +00:00
# endif