2019-10-25 18:06:08 +00:00
# include "SSDCacheDictionary.h"
2020-01-03 19:52:07 +00:00
# include <algorithm>
2019-10-25 18:06:08 +00:00
# include <Columns/ColumnsNumber.h>
2020-01-01 17:40:46 +00:00
# include <Common/typeid_cast.h>
2020-01-02 19:33:19 +00:00
# include <Common/ProfileEvents.h>
# include <Common/ProfilingScopedRWLock.h>
2020-01-07 14:59:03 +00:00
# include <Common/MemorySanitizer.h>
2020-01-01 17:40:46 +00:00
# include <DataStreams/IBlockInputStream.h>
2020-01-12 12:29:42 +00:00
# include <Poco/File.h>
2020-01-18 11:47:58 +00:00
# include "DictionaryBlockInputStream.h"
2020-01-05 13:59:49 +00:00
# include "DictionaryFactory.h"
2020-01-06 20:38:32 +00:00
# include <IO/AIO.h>
# include <IO/ReadHelpers.h>
2020-01-04 15:04:16 +00:00
# include <IO/WriteHelpers.h>
2020-01-01 17:40:46 +00:00
# include <ext/chrono_io.h>
# include <ext/map.h>
# include <ext/range.h>
# include <ext/size.h>
2020-01-06 20:38:32 +00:00
# include <ext/bit_cast.h>
2020-01-07 17:55:32 +00:00
# include <numeric>
2020-01-12 11:32:43 +00:00
# include <filesystem>
2019-10-25 18:06:08 +00:00
2020-01-02 19:33:19 +00:00
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested ;
extern const Event DictCacheKeysRequestedMiss ;
extern const Event DictCacheKeysRequestedFound ;
extern const Event DictCacheKeysExpired ;
extern const Event DictCacheKeysNotFound ;
extern const Event DictCacheKeysHit ;
extern const Event DictCacheRequestTimeNs ;
extern const Event DictCacheRequests ;
extern const Event DictCacheLockWriteNs ;
extern const Event DictCacheLockReadNs ;
2020-01-06 20:38:32 +00:00
extern const Event FileOpen ;
2020-01-07 14:59:03 +00:00
extern const Event WriteBufferAIOWrite ;
extern const Event WriteBufferAIOWriteBytes ;
2020-01-02 19:33:19 +00:00
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests ;
2020-01-07 14:59:03 +00:00
extern const Metric Write ;
2020-01-02 19:33:19 +00:00
}
2019-10-25 18:06:08 +00:00
namespace DB
{
2020-01-01 17:40:46 +00:00
namespace ErrorCodes
2019-10-25 18:06:08 +00:00
{
2020-01-01 17:40:46 +00:00
extern const int TYPE_MISMATCH ;
extern const int BAD_ARGUMENTS ;
extern const int UNSUPPORTED_METHOD ;
extern const int LOGICAL_ERROR ;
extern const int TOO_SMALL_BUFFER_SIZE ;
2020-01-06 20:38:32 +00:00
extern const int FILE_DOESNT_EXIST ;
extern const int CANNOT_OPEN_FILE ;
extern const int CANNOT_IO_SUBMIT ;
extern const int CANNOT_IO_GETEVENTS ;
2020-01-07 14:59:03 +00:00
extern const int AIO_WRITE_ERROR ;
extern const int CANNOT_FSYNC ;
2019-10-25 18:06:08 +00:00
}
2020-01-03 19:52:07 +00:00
namespace
2020-01-01 17:40:46 +00:00
{
2020-01-12 14:23:32 +00:00
constexpr size_t DEFAULT_SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE ;
constexpr size_t DEFAULT_FILE_SIZE = 4 * 1024 * 1024 * 1024ULL ;
constexpr size_t DEFAULT_PARTITIONS_COUNT = 16 ;
constexpr size_t DEFAULT_READ_BUFFER_SIZE = 16 * DEFAULT_SSD_BLOCK_SIZE ;
2020-01-19 08:49:40 +00:00
constexpr size_t DEFAULT_WRITE_BUFFER_SIZE = DEFAULT_SSD_BLOCK_SIZE ;
2020-01-07 17:55:32 +00:00
2020-01-12 14:23:32 +00:00
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE ;
2020-01-11 07:20:48 +00:00
2020-01-06 20:38:32 +00:00
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std : : numeric_limits < std : : chrono : : system_clock : : time_point : : rep > : : max ( ) ;
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~ KEY_METADATA_EXPIRES_AT_MASK ;
constexpr size_t KEY_IN_MEMORY_BIT = 63 ;
constexpr size_t KEY_IN_MEMORY = ( 1ULL < < KEY_IN_MEMORY_BIT ) ;
constexpr size_t BLOCK_INDEX_BITS = 32 ;
constexpr size_t INDEX_IN_BLOCK_BITS = 16 ;
constexpr size_t INDEX_IN_BLOCK_MASK = ( 1ULL < < INDEX_IN_BLOCK_BITS ) - 1 ;
constexpr size_t BLOCK_INDEX_MASK = ( ( 1ULL < < ( BLOCK_INDEX_BITS + INDEX_IN_BLOCK_BITS ) ) - 1 ) ^ INDEX_IN_BLOCK_MASK ;
2020-01-08 17:52:13 +00:00
constexpr size_t NOT_EXISTS = - 1 ;
2020-01-06 20:38:32 +00:00
2020-01-11 20:23:51 +00:00
constexpr UInt8 HAS_NOT_FOUND = 2 ;
2020-01-03 19:52:07 +00:00
const std : : string BIN_FILE_EXT = " .bin " ;
const std : : string IND_FILE_EXT = " .idx " ;
2020-01-11 07:20:48 +00:00
int preallocateDiskSpace ( int fd , size_t len )
{
# if defined(__FreeBSD__)
return posix_fallocate ( fd , 0 , len ) ;
# else
return fallocate ( fd , 0 , 0 , len ) ;
# endif
}
2020-01-01 17:40:46 +00:00
}
2020-01-08 12:40:29 +00:00
CachePartition : : Metadata : : time_point_t CachePartition : : Metadata : : expiresAt ( ) const
2020-01-06 20:38:32 +00:00
{
return ext : : safe_bit_cast < time_point_t > ( data & KEY_METADATA_EXPIRES_AT_MASK ) ;
}
2020-01-08 12:40:29 +00:00
void CachePartition : : Metadata : : setExpiresAt ( const time_point_t & t )
2020-01-06 20:38:32 +00:00
{
data = ext : : safe_bit_cast < time_point_urep_t > ( t ) ;
}
2020-01-08 12:40:29 +00:00
bool CachePartition : : Metadata : : isDefault ( ) const
2020-01-06 20:38:32 +00:00
{
return ( data & KEY_METADATA_IS_DEFAULT_MASK ) = = KEY_METADATA_IS_DEFAULT_MASK ;
}
2020-01-08 12:40:29 +00:00
void CachePartition : : Metadata : : setDefault ( )
2020-01-06 20:38:32 +00:00
{
data | = KEY_METADATA_IS_DEFAULT_MASK ;
}
bool CachePartition : : Index : : inMemory ( ) const
{
return ( index & KEY_IN_MEMORY ) = = KEY_IN_MEMORY ;
}
bool CachePartition : : Index : : exists ( ) const
{
2020-01-08 17:52:13 +00:00
return index ! = NOT_EXISTS ;
2020-01-06 20:38:32 +00:00
}
void CachePartition : : Index : : setNotExists ( )
{
2020-01-08 17:52:13 +00:00
index = NOT_EXISTS ;
2020-01-06 20:38:32 +00:00
}
void CachePartition : : Index : : setInMemory ( const bool in_memory )
{
index = ( index & ~ KEY_IN_MEMORY ) | ( static_cast < size_t > ( in_memory ) < < KEY_IN_MEMORY_BIT ) ;
}
size_t CachePartition : : Index : : getAddressInBlock ( ) const
{
return index & INDEX_IN_BLOCK_MASK ;
}
void CachePartition : : Index : : setAddressInBlock ( const size_t address_in_block )
{
index = ( index & ~ INDEX_IN_BLOCK_MASK ) | address_in_block ;
}
size_t CachePartition : : Index : : getBlockId ( ) const
{
return ( index & BLOCK_INDEX_MASK ) > > INDEX_IN_BLOCK_BITS ;
}
void CachePartition : : Index : : setBlockId ( const size_t block_id )
{
index = ( index & ~ BLOCK_INDEX_MASK ) | ( block_id < < INDEX_IN_BLOCK_BITS ) ;
}
2020-01-05 13:59:49 +00:00
CachePartition : : CachePartition (
2020-01-12 14:23:32 +00:00
const AttributeUnderlyingType & /* key_structure */ ,
const std : : vector < AttributeUnderlyingType > & attributes_structure_ ,
const std : : string & dir_path ,
const size_t file_id_ ,
const size_t max_size_ ,
const size_t block_size_ ,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_ ,
const size_t write_buffer_size_ )
2020-01-12 14:23:32 +00:00
: file_id ( file_id_ )
, max_size ( max_size_ )
, block_size ( block_size_ )
, read_buffer_size ( read_buffer_size_ )
2020-01-19 08:49:40 +00:00
, write_buffer_size ( write_buffer_size_ )
2020-01-12 14:23:32 +00:00
, path ( dir_path + " / " + std : : to_string ( file_id ) )
2020-01-12 11:32:43 +00:00
, attributes_structure ( attributes_structure_ )
2019-10-25 18:06:08 +00:00
{
2020-01-05 13:59:49 +00:00
keys_buffer . type = AttributeUnderlyingType : : utUInt64 ;
2020-01-26 17:35:39 +00:00
keys_buffer . values = CachePartition : : Attribute : : Container < UInt64 > ( ) ;
2020-01-06 20:38:32 +00:00
2020-01-12 12:29:42 +00:00
Poco : : File directory ( dir_path ) ;
if ( ! directory . exists ( ) )
directory . createDirectory ( ) ;
2020-01-06 20:38:32 +00:00
{
ProfileEvents : : increment ( ProfileEvents : : FileOpen ) ;
const std : : string filename = path + BIN_FILE_EXT ;
2020-01-07 17:55:32 +00:00
fd = : : open ( filename . c_str ( ) , O_RDWR | O_CREAT | O_TRUNC | O_DIRECT , 0666 ) ;
if ( fd = = - 1 )
2020-01-06 20:38:32 +00:00
{
auto error_code = ( errno = = ENOENT ) ? ErrorCodes : : FILE_DOESNT_EXIST : ErrorCodes : : CANNOT_OPEN_FILE ;
throwFromErrnoWithPath ( " Cannot open file " + filename , filename , error_code ) ;
}
2020-01-11 07:20:48 +00:00
2020-01-12 14:23:32 +00:00
if ( preallocateDiskSpace ( fd , max_size * block_size ) < 0 )
2020-01-11 07:20:48 +00:00
{
throwFromErrnoWithPath ( " Cannot preallocate space for the file " + filename , filename , ErrorCodes : : CANNOT_ALLOCATE_MEMORY ) ;
}
2020-01-06 20:38:32 +00:00
}
}
2020-01-07 11:26:52 +00:00
CachePartition : : ~ CachePartition ( )
{
2020-01-09 19:34:03 +00:00
std : : unique_lock lock ( rw_lock ) ;
2020-01-07 17:55:32 +00:00
: : close ( fd ) ;
2020-01-03 19:52:07 +00:00
}
2019-10-25 18:06:08 +00:00
2020-02-01 10:12:35 +00:00
size_t CachePartition : : appendDefaults (
const Attribute & new_keys , const PaddedPODArray < Metadata > & metadata , const size_t begin )
{
std : : unique_lock lock ( rw_lock ) ;
const auto & ids = std : : get < Attribute : : Container < UInt64 > > ( new_keys . values ) ;
for ( size_t index = begin ; index < ids . size ( ) ; + + index )
{
auto & index_and_metadata = key_to_index_and_metadata [ ids [ index ] ] ;
index_and_metadata . metadata = metadata [ index ] ;
index_and_metadata . metadata . setDefault ( ) ;
}
return ids . size ( ) - begin ;
}
2020-01-11 16:38:43 +00:00
size_t CachePartition : : appendBlock (
2020-02-01 10:12:35 +00:00
const Attribute & new_keys , const Attributes & new_attributes , const PaddedPODArray < Metadata > & metadata , const size_t begin )
2020-01-03 19:52:07 +00:00
{
2020-01-09 19:34:03 +00:00
std : : unique_lock lock ( rw_lock ) ;
2020-01-12 14:23:32 +00:00
if ( current_file_block_id > = max_size )
2020-01-11 11:19:12 +00:00
return 0 ;
2020-01-08 17:52:13 +00:00
if ( new_attributes . size ( ) ! = attributes_structure . size ( ) )
2020-01-03 19:52:07 +00:00
throw Exception { " Wrong columns number in block. " , ErrorCodes : : BAD_ARGUMENTS } ;
2020-01-05 13:59:49 +00:00
const auto & ids = std : : get < Attribute : : Container < UInt64 > > ( new_keys . values ) ;
2020-01-07 19:18:24 +00:00
auto & ids_buffer = std : : get < Attribute : : Container < UInt64 > > ( keys_buffer . values ) ;
2019-10-25 18:06:08 +00:00
2020-01-12 11:32:43 +00:00
if ( ! memory )
2020-01-12 14:23:32 +00:00
memory . emplace ( block_size , BUFFER_ALIGNMENT ) ;
2020-01-07 11:26:52 +00:00
if ( ! write_buffer )
2020-01-18 17:46:00 +00:00
{
2020-01-19 08:49:40 +00:00
write_buffer . emplace ( memory - > data ( ) + current_memory_block_id * block_size , block_size ) ;
2020-01-18 17:46:00 +00:00
// codec = CompressionCodecFactory::instance().get("NONE", std::nullopt);
// compressed_buffer.emplace(*write_buffer, codec);
// hashing_buffer.emplace(*compressed_buffer);
}
2019-10-25 18:06:08 +00:00
2020-01-11 16:38:43 +00:00
for ( size_t index = begin ; index < ids . size ( ) ; )
2020-01-06 20:38:32 +00:00
{
2020-01-11 16:38:43 +00:00
IndexAndMetadata index_and_metadata ;
2020-01-08 12:40:29 +00:00
index_and_metadata . index . setInMemory ( true ) ;
index_and_metadata . index . setBlockId ( current_memory_block_id ) ;
index_and_metadata . index . setAddressInBlock ( write_buffer - > offset ( ) ) ;
index_and_metadata . metadata = metadata [ index ] ;
2020-01-07 11:26:52 +00:00
2020-01-07 19:18:24 +00:00
bool flushed = false ;
2020-01-07 11:26:52 +00:00
for ( const auto & attribute : new_attributes )
{
// TODO:: переделать через столбцы + getDataAt
2020-01-07 14:59:03 +00:00
switch ( attribute . type )
{
2020-01-07 11:26:52 +00:00
# define DISPATCH(TYPE) \
case AttributeUnderlyingType : : ut # # TYPE : \
{ \
if ( sizeof ( TYPE ) > write_buffer - > available ( ) ) \
{ \
2020-01-19 08:49:40 +00:00
write_buffer . reset ( ) ; \
if ( + + current_memory_block_id = = write_buffer_size ) \
flush ( ) ; \
2020-01-07 19:18:24 +00:00
flushed = true ; \
2020-01-07 11:26:52 +00:00
continue ; \
} \
else \
{ \
const auto & values = std : : get < Attribute : : Container < TYPE > > ( attribute . values ) ; \
writeBinary ( values [ index ] , * write_buffer ) ; \
} \
} \
break ;
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
2020-01-26 17:35:39 +00:00
case AttributeUnderlyingType : : utString :
2020-01-28 20:32:41 +00:00
{
2020-01-26 17:35:39 +00:00
LOG_DEBUG ( & Poco : : Logger : : get ( " kek " ) , " string write " ) ;
const auto & value = std : : get < Attribute : : Container < String > > ( attribute . values ) [ index ] ;
if ( sizeof ( UInt64 ) + value . size ( ) > write_buffer - > available ( ) )
{
write_buffer . reset ( ) ;
if ( + + current_memory_block_id = = write_buffer_size )
flush ( ) ;
flushed = true ;
continue ;
}
else
{
writeStringBinary ( value , * write_buffer ) ;
}
2020-01-28 20:32:41 +00:00
}
2020-01-26 17:35:39 +00:00
break ;
2020-01-07 11:26:52 +00:00
}
}
2020-01-07 14:59:03 +00:00
2020-01-07 19:18:24 +00:00
if ( ! flushed )
{
2020-01-11 16:38:43 +00:00
key_to_index_and_metadata [ ids [ index ] ] = index_and_metadata ;
2020-01-07 19:18:24 +00:00
ids_buffer . push_back ( ids [ index ] ) ;
+ + index ;
}
2020-01-19 08:49:40 +00:00
else if ( current_file_block_id < max_size ) // next block in write buffer or flushed to ssd
2020-01-12 11:32:43 +00:00
{
2020-01-19 08:49:40 +00:00
write_buffer . emplace ( memory - > data ( ) + current_memory_block_id * block_size , block_size ) ;
2020-01-12 11:32:43 +00:00
}
2020-01-19 08:49:40 +00:00
else // flushed to ssd, end of current file
2020-01-12 11:32:43 +00:00
{
memory . reset ( ) ;
2020-01-19 08:49:40 +00:00
return index - begin ;
2020-01-12 11:32:43 +00:00
}
2020-01-06 20:38:32 +00:00
}
2020-01-11 16:38:43 +00:00
return ids . size ( ) - begin ;
2019-10-25 18:06:08 +00:00
}
2020-01-01 17:40:46 +00:00
void CachePartition : : flush ( )
2019-10-25 18:06:08 +00:00
{
2020-01-05 13:59:49 +00:00
const auto & ids = std : : get < Attribute : : Container < UInt64 > > ( keys_buffer . values ) ;
2020-01-06 20:38:32 +00:00
if ( ids . empty ( ) )
return ;
2020-01-11 20:23:51 +00:00
Poco : : Logger : : get ( " paritiiton " ) . information ( " @@@@@@@@@@@@@@@@@@@@ FLUSH!!! " + std : : to_string ( file_id ) + " block: " + std : : to_string ( current_file_block_id ) ) ;
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
AIOContext aio_context { 1 } ;
2019-10-25 18:06:08 +00:00
2020-01-07 17:55:32 +00:00
iocb write_request { } ;
2020-01-07 14:59:03 +00:00
iocb * write_request_ptr { & write_request } ;
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
# if defined(__FreeBSD__)
write_request . aio . aio_lio_opcode = LIO_WRITE ;
write_request . aio . aio_fildes = fd ;
2020-01-12 11:32:43 +00:00
write_request . aio . aio_buf = reinterpret_cast < volatile void * > ( memory - > data ( ) ) ;
2020-01-12 14:23:32 +00:00
write_request . aio . aio_nbytes = block_size ;
write_request . aio . aio_offset = block_size * current_file_block_id ;
2020-01-07 14:59:03 +00:00
# else
write_request . aio_lio_opcode = IOCB_CMD_PWRITE ;
2020-01-07 17:55:32 +00:00
write_request . aio_fildes = fd ;
2020-01-12 11:32:43 +00:00
write_request . aio_buf = reinterpret_cast < UInt64 > ( memory - > data ( ) ) ;
2020-01-19 08:49:40 +00:00
write_request . aio_nbytes = block_size * write_buffer_size ;
2020-01-12 14:23:32 +00:00
write_request . aio_offset = block_size * current_file_block_id ;
2020-01-07 14:59:03 +00:00
# endif
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
Poco : : Logger : : get ( " try: " ) . information ( " offset: " + std : : to_string ( write_request . aio_offset ) + " nbytes: " + std : : to_string ( write_request . aio_nbytes ) ) ;
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
while ( io_submit ( aio_context . ctx , 1 , & write_request_ptr ) < 0 )
{
if ( errno ! = EINTR )
throw Exception ( " Cannot submit request for asynchronous IO on file " + path + BIN_FILE_EXT , ErrorCodes : : CANNOT_IO_SUBMIT ) ;
}
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
CurrentMetrics : : Increment metric_increment_write { CurrentMetrics : : Write } ;
io_event event ;
while ( io_getevents ( aio_context . ctx , 1 , 1 , & event , nullptr ) < 0 )
{
if ( errno ! = EINTR )
throw Exception ( " Failed to wait for asynchronous IO completion on file " + path + BIN_FILE_EXT , ErrorCodes : : CANNOT_IO_GETEVENTS ) ;
2019-10-25 18:06:08 +00:00
}
2020-01-07 14:59:03 +00:00
// Unpoison the memory returned from an uninstrumented system function.
__msan_unpoison ( & event , sizeof ( event ) ) ;
ssize_t bytes_written ;
# if defined(__FreeBSD__)
bytes_written = aio_return ( reinterpret_cast < struct aiocb * > ( event . udata ) ) ;
# else
bytes_written = event . res ;
# endif
ProfileEvents : : increment ( ProfileEvents : : WriteBufferAIOWrite ) ;
ProfileEvents : : increment ( ProfileEvents : : WriteBufferAIOWriteBytes , bytes_written ) ;
if ( bytes_written ! = static_cast < decltype ( bytes_written ) > ( write_request . aio_nbytes ) )
throw Exception ( " Not all data was written for asynchronous IO on file " + path + BIN_FILE_EXT + " . returned: " + std : : to_string ( bytes_written ) , ErrorCodes : : AIO_WRITE_ERROR ) ;
2020-01-07 17:55:32 +00:00
if ( : : fsync ( fd ) < 0 )
throwFromErrnoWithPath ( " Cannot fsync " + path + BIN_FILE_EXT , path + BIN_FILE_EXT , ErrorCodes : : CANNOT_FSYNC ) ;
2019-10-25 18:06:08 +00:00
2020-01-04 15:04:16 +00:00
/// commit changes in index
for ( size_t row = 0 ; row < ids . size ( ) ; + + row )
2020-01-06 20:38:32 +00:00
{
2020-01-19 08:49:40 +00:00
auto & index = key_to_index_and_metadata [ ids [ row ] ] . index ;
2020-01-19 10:54:36 +00:00
if ( index . inMemory ( ) ) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
2020-01-19 08:49:40 +00:00
{
index . setInMemory ( false ) ;
index . setBlockId ( current_file_block_id + index . getBlockId ( ) ) ;
}
2020-01-06 20:38:32 +00:00
}
2020-01-04 15:04:16 +00:00
2020-01-19 08:49:40 +00:00
current_file_block_id + = write_buffer_size ;
current_memory_block_id = 0 ;
2020-01-07 14:59:03 +00:00
2020-01-04 15:04:16 +00:00
/// clear buffer
2020-01-05 13:59:49 +00:00
std : : visit ( [ ] ( auto & attr ) { attr . clear ( ) ; } , keys_buffer . values ) ;
2019-10-25 18:06:08 +00:00
}
2020-02-01 18:13:43 +00:00
template < typename Out , typename GetDefault >
2020-01-05 20:31:25 +00:00
void CachePartition : : getValue ( const size_t attribute_index , const PaddedPODArray < UInt64 > & ids ,
2020-02-01 18:13:43 +00:00
ResultArrayType < Out > & out , std : : vector < bool > & found , GetDefault & get_default ,
2020-02-01 10:12:35 +00:00
std : : chrono : : system_clock : : time_point now ) const
2020-01-29 18:51:09 +00:00
{
auto set_value = [ & ] ( const size_t index , ReadBuffer & buf )
{
ignoreFromBufferToIndex ( attribute_index , buf ) ;
readBinary ( out [ index ] , buf ) ;
} ;
2020-02-01 18:13:43 +00:00
auto set_default = [ & ] ( const size_t index )
{
out [ index ] = get_default ( index ) ;
} ;
getImpl ( ids , set_value , set_default , found , now ) ;
2020-01-29 18:51:09 +00:00
}
void CachePartition : : getString ( const size_t attribute_index , const PaddedPODArray < UInt64 > & ids ,
2020-02-01 18:13:43 +00:00
StringRefs & refs , ArenaWithFreeLists & arena , std : : vector < bool > & found , std : : vector < size_t > & default_ids ,
std : : chrono : : system_clock : : time_point now ) const
2020-01-29 18:51:09 +00:00
{
2020-02-01 10:12:35 +00:00
auto set_value = [ & ] ( const size_t index , ReadBuffer & buf )
2020-01-29 18:51:09 +00:00
{
ignoreFromBufferToIndex ( attribute_index , buf ) ;
size_t size = 0 ;
readVarUInt ( size , buf ) ;
2020-02-01 10:12:35 +00:00
char * string_ptr = arena . alloc ( size ) ;
memcpy ( string_ptr , buf . position ( ) , size ) ;
refs [ index ] . data = string_ptr ;
refs [ index ] . size = size ;
2020-01-29 18:51:09 +00:00
} ;
2020-02-01 18:13:43 +00:00
auto set_default = [ & ] ( const size_t index )
{
default_ids . push_back ( index ) ;
} ;
getImpl ( ids , set_value , set_default , found , now ) ;
2020-01-29 18:51:09 +00:00
}
2020-02-01 18:13:43 +00:00
template < typename SetFunc , typename SetDefault >
void CachePartition : : getImpl ( const PaddedPODArray < UInt64 > & ids , SetFunc & set , SetDefault & set_default ,
std : : vector < bool > & found , std : : chrono : : system_clock : : time_point now ) const
2020-01-03 19:52:07 +00:00
{
2020-01-09 19:34:03 +00:00
std : : shared_lock lock ( rw_lock ) ;
2020-01-06 20:38:32 +00:00
PaddedPODArray < Index > indices ( ids . size ( ) ) ;
2020-01-05 17:05:49 +00:00
for ( size_t i = 0 ; i < ids . size ( ) ; + + i )
{
2020-01-11 11:19:12 +00:00
if ( found [ i ] )
2020-01-08 12:40:29 +00:00
{
indices [ i ] . setNotExists ( ) ;
}
2020-01-11 11:19:12 +00:00
else if ( auto it = key_to_index_and_metadata . find ( ids [ i ] ) ;
2020-02-01 10:12:35 +00:00
it ! = std : : end ( key_to_index_and_metadata ) & & it - > second . metadata . expiresAt ( ) > now )
2020-01-05 20:31:25 +00:00
{
2020-02-01 18:13:43 +00:00
if ( unlikely ( it - > second . metadata . isDefault ( ) ) )
{
indices [ i ] . setNotExists ( ) ;
set_default ( i ) ;
}
else
indices [ i ] = it - > second . index ;
2020-01-11 11:19:12 +00:00
found [ i ] = true ;
2020-01-05 20:31:25 +00:00
}
else
{
2020-01-11 11:19:12 +00:00
indices [ i ] . setNotExists ( ) ;
2020-01-05 20:31:25 +00:00
}
2020-01-05 17:05:49 +00:00
}
2020-01-07 19:18:24 +00:00
2020-01-29 18:51:09 +00:00
getValueFromMemory ( indices , set ) ;
getValueFromStorage ( indices , set ) ;
2020-01-03 19:52:07 +00:00
}
2020-01-26 17:35:39 +00:00
template < typename SetFunc >
2020-01-28 20:32:41 +00:00
void CachePartition : : getValueFromMemory ( const PaddedPODArray < Index > & indices , SetFunc & set ) const
2020-01-03 19:52:07 +00:00
{
2020-01-05 20:31:25 +00:00
for ( size_t i = 0 ; i < indices . size ( ) ; + + i )
{
const auto & index = indices [ i ] ;
2020-01-06 20:38:32 +00:00
if ( index . exists ( ) & & index . inMemory ( ) )
2020-01-05 20:31:25 +00:00
{
2020-01-19 08:49:40 +00:00
const size_t offset = index . getBlockId ( ) * block_size + index . getAddressInBlock ( ) ;
2020-01-07 11:26:52 +00:00
2020-01-19 08:49:40 +00:00
ReadBufferFromMemory read_buffer ( memory - > data ( ) + offset , block_size * write_buffer_size - offset ) ;
2020-01-26 17:35:39 +00:00
set ( i , read_buffer ) ;
2020-01-05 20:31:25 +00:00
}
}
}
2020-01-26 17:35:39 +00:00
template < typename SetFunc >
2020-01-28 20:32:41 +00:00
void CachePartition : : getValueFromStorage ( const PaddedPODArray < Index > & indices , SetFunc & set ) const
2020-01-05 20:31:25 +00:00
{
2020-01-07 17:55:32 +00:00
std : : vector < std : : pair < Index , size_t > > index_to_out ;
2020-01-06 20:38:32 +00:00
for ( size_t i = 0 ; i < indices . size ( ) ; + + i )
{
const auto & index = indices [ i ] ;
if ( index . exists ( ) & & ! index . inMemory ( ) )
2020-01-07 17:55:32 +00:00
index_to_out . emplace_back ( index , i ) ;
2020-01-06 20:38:32 +00:00
}
if ( index_to_out . empty ( ) )
return ;
2020-01-07 17:55:32 +00:00
/// sort by (block_id, offset_in_block)
2020-01-06 20:38:32 +00:00
std : : sort ( std : : begin ( index_to_out ) , std : : end ( index_to_out ) ) ;
2020-01-18 13:21:07 +00:00
Memory read_buffer ( block_size * read_buffer_size , BUFFER_ALIGNMENT ) ;
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
std : : vector < iocb > requests ;
std : : vector < iocb * > pointers ;
std : : vector < std : : vector < size_t > > blocks_to_indices ;
requests . reserve ( index_to_out . size ( ) ) ;
pointers . reserve ( index_to_out . size ( ) ) ;
blocks_to_indices . reserve ( index_to_out . size ( ) ) ;
2020-01-06 20:38:32 +00:00
for ( size_t i = 0 ; i < index_to_out . size ( ) ; + + i )
{
2020-01-07 17:55:32 +00:00
if ( ! requests . empty ( ) & &
2020-01-12 14:23:32 +00:00
static_cast < size_t > ( requests . back ( ) . aio_offset ) = = index_to_out [ i ] . first . getBlockId ( ) * block_size )
2020-01-07 17:55:32 +00:00
{
blocks_to_indices . back ( ) . push_back ( i ) ;
continue ;
}
iocb request { } ;
2020-01-06 20:38:32 +00:00
# if defined(__FreeBSD__)
request . aio . aio_lio_opcode = LIO_READ ;
2020-01-07 17:55:32 +00:00
request . aio . aio_fildes = fd ;
request . aio . aio_buf = reinterpret_cast < volatile void * > (
2020-01-08 17:10:37 +00:00
reinterpret_cast < UInt64 > ( read_buffer . data ( ) ) + SSD_BLOCK_SIZE * ( requests . size ( ) % READ_BUFFER_SIZE_BLOCKS ) ) ;
2020-01-07 17:55:32 +00:00
request . aio . aio_nbytes = SSD_BLOCK_SIZE ;
2020-01-06 20:38:32 +00:00
request . aio . aio_offset = index_to_out [ i ] . first ;
2020-01-08 12:40:29 +00:00
request . aio_data = requests . size ( ) ;
2020-01-06 20:38:32 +00:00
# else
2020-01-07 17:55:32 +00:00
request . aio_lio_opcode = IOCB_CMD_PREAD ;
request . aio_fildes = fd ;
2020-01-12 14:23:32 +00:00
request . aio_buf = reinterpret_cast < UInt64 > ( read_buffer . data ( ) ) + block_size * ( requests . size ( ) % read_buffer_size ) ;
request . aio_nbytes = block_size ;
request . aio_offset = index_to_out [ i ] . first . getBlockId ( ) * block_size ;
2020-01-07 19:18:24 +00:00
request . aio_data = requests . size ( ) ;
2020-01-06 20:38:32 +00:00
# endif
2020-01-07 17:55:32 +00:00
requests . push_back ( request ) ;
pointers . push_back ( & requests . back ( ) ) ;
blocks_to_indices . emplace_back ( ) ;
blocks_to_indices . back ( ) . push_back ( i ) ;
2020-01-06 20:38:32 +00:00
}
2020-01-12 14:23:32 +00:00
AIOContext aio_context ( read_buffer_size ) ;
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
std : : vector < bool > processed ( requests . size ( ) , false ) ;
std : : vector < io_event > events ( requests . size ( ) ) ;
2020-01-19 10:54:36 +00:00
for ( auto & event : events )
event . res = - 1 ; // TODO: remove
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
size_t to_push = 0 ;
size_t to_pop = 0 ;
while ( to_pop < requests . size ( ) )
2020-01-06 20:38:32 +00:00
{
2020-01-07 17:55:32 +00:00
/// get io tasks from previous iteration
size_t popped = 0 ;
while ( to_pop < to_push & & ( popped = io_getevents ( aio_context . ctx , to_push - to_pop , to_push - to_pop , & events [ to_pop ] , nullptr ) ) < 0 )
2020-01-06 20:38:32 +00:00
{
if ( errno ! = EINTR )
2020-01-19 10:54:36 +00:00
throwFromErrno ( " io_getevents: Failed to get an event for asynchronous IO " , ErrorCodes : : CANNOT_IO_GETEVENTS ) ;
2020-01-06 20:38:32 +00:00
}
2020-01-07 17:55:32 +00:00
for ( size_t i = to_pop ; i < to_pop + popped ; + + i )
2020-01-06 20:38:32 +00:00
{
2020-01-07 17:55:32 +00:00
const auto request_id = events [ i ] . data ;
const auto & request = requests [ request_id ] ;
if ( events [ i ] . res ! = static_cast < ssize_t > ( request . aio_nbytes ) )
2020-01-18 11:47:58 +00:00
throw Exception ( " AIO failed to read file " + path + BIN_FILE_EXT + " . " +
" request_id= " + std : : to_string ( request . aio_data ) + " , aio_nbytes= " + std : : to_string ( request . aio_nbytes ) + " , aio_offset= " + std : : to_string ( request . aio_offset ) +
" returned: " + std : : to_string ( events [ i ] . res ) , ErrorCodes : : AIO_WRITE_ERROR ) ;
2020-01-07 17:55:32 +00:00
for ( const size_t idx : blocks_to_indices [ request_id ] )
{
const auto & [ file_index , out_index ] = index_to_out [ idx ] ;
2020-01-18 13:21:07 +00:00
ReadBufferFromMemory buf (
2020-01-07 17:55:32 +00:00
reinterpret_cast < char * > ( request . aio_buf ) + file_index . getAddressInBlock ( ) ,
2020-01-12 14:23:32 +00:00
block_size - file_index . getAddressInBlock ( ) ) ;
2020-01-28 20:32:41 +00:00
set ( out_index , buf ) ;
2020-01-07 17:55:32 +00:00
}
processed [ request_id ] = true ;
2020-01-06 20:38:32 +00:00
}
2020-01-07 17:55:32 +00:00
while ( to_pop < requests . size ( ) & & processed [ to_pop ] )
+ + to_pop ;
2020-01-07 11:26:52 +00:00
2020-01-07 17:55:32 +00:00
/// add new io tasks
2020-01-12 14:23:32 +00:00
const size_t new_tasks_count = std : : min ( read_buffer_size - ( to_push - to_pop ) , requests . size ( ) - to_push ) ;
2020-01-07 17:55:32 +00:00
size_t pushed = 0 ;
while ( new_tasks_count > 0 & & ( pushed = io_submit ( aio_context . ctx , new_tasks_count , & pointers [ to_push ] ) ) < 0 )
{
if ( errno ! = EINTR )
throwFromErrno ( " io_submit: Failed to submit a request for asynchronous IO " , ErrorCodes : : CANNOT_IO_SUBMIT ) ;
}
to_push + = pushed ;
2020-01-07 11:26:52 +00:00
}
}
2020-01-06 20:38:32 +00:00
2020-01-29 18:51:09 +00:00
void CachePartition : : ignoreFromBufferToIndex ( const size_t attribute_index , ReadBuffer & buf ) const
2020-01-07 11:26:52 +00:00
{
for ( size_t i = 0 ; i < attribute_index ; + + i )
{
2020-01-08 17:52:13 +00:00
switch ( attributes_structure [ i ] )
2020-01-06 20:38:32 +00:00
{
2020-01-07 11:26:52 +00:00
# define DISPATCH(TYPE) \
2020-01-26 17:35:39 +00:00
case AttributeUnderlyingType : : ut # # TYPE : \
2020-01-27 19:29:23 +00:00
buf . ignore ( sizeof ( TYPE ) ) ; \
2020-01-26 17:35:39 +00:00
break ;
2020-01-06 20:38:32 +00:00
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
2020-01-26 17:35:39 +00:00
case AttributeUnderlyingType : : utString :
2020-01-28 20:32:41 +00:00
{
2020-01-26 17:35:39 +00:00
size_t size = 0 ;
readVarUInt ( size , buf ) ;
buf . ignore ( size ) ;
2020-01-28 20:32:41 +00:00
}
2020-01-26 17:35:39 +00:00
break ;
2020-01-06 20:38:32 +00:00
}
}
2020-01-03 19:52:07 +00:00
}
2020-01-11 11:19:12 +00:00
void CachePartition : : has ( const PaddedPODArray < UInt64 > & ids , ResultArrayType < UInt8 > & out , std : : chrono : : system_clock : : time_point now ) const
2020-01-05 20:31:25 +00:00
{
2020-01-09 19:34:03 +00:00
std : : shared_lock lock ( rw_lock ) ;
2020-01-05 20:31:25 +00:00
for ( size_t i = 0 ; i < ids . size ( ) ; + + i )
2020-01-06 20:38:32 +00:00
{
2020-01-08 12:40:29 +00:00
auto it = key_to_index_and_metadata . find ( ids [ i ] ) ;
2020-01-11 11:19:12 +00:00
if ( it = = std : : end ( key_to_index_and_metadata ) | | it - > second . metadata . expiresAt ( ) < = now )
2020-01-08 12:40:29 +00:00
{
2020-01-11 20:23:51 +00:00
out [ i ] = HAS_NOT_FOUND ;
2020-01-06 20:38:32 +00:00
}
else
{
2020-01-08 12:40:29 +00:00
out [ i ] = ! it - > second . metadata . isDefault ( ) ;
2020-01-06 20:38:32 +00:00
}
}
2020-01-05 20:31:25 +00:00
}
2020-01-11 16:38:43 +00:00
size_t CachePartition : : getId ( ) const
{
return file_id ;
}
2020-01-18 17:46:00 +00:00
double CachePartition : : getLoadFactor ( ) const
{
std : : shared_lock lock ( rw_lock ) ;
return static_cast < double > ( current_file_block_id ) / max_size ;
}
size_t CachePartition : : getElementCount ( ) const
{
std : : shared_lock lock ( rw_lock ) ;
return key_to_index_and_metadata . size ( ) ;
}
2020-01-18 11:47:58 +00:00
PaddedPODArray < CachePartition : : Key > CachePartition : : getCachedIds ( const std : : chrono : : system_clock : : time_point now ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
PaddedPODArray < Key > array ;
for ( const auto & [ key , index_and_metadata ] : key_to_index_and_metadata )
if ( ! index_and_metadata . metadata . isDefault ( ) & & index_and_metadata . metadata . expiresAt ( ) > now )
array . push_back ( key ) ;
return array ;
}
2020-01-12 11:32:43 +00:00
void CachePartition : : remove ( )
{
std : : unique_lock lock ( rw_lock ) ;
2020-01-19 08:49:40 +00:00
//Poco::File(path + BIN_FILE_EXT).remove();
2020-01-12 12:29:42 +00:00
//std::filesystem::remove(std::filesystem::path(path + BIN_FILE_EXT));
2020-01-12 11:32:43 +00:00
}
2020-01-08 19:41:05 +00:00
CacheStorage : : CacheStorage (
2020-01-12 14:23:32 +00:00
const AttributeTypes & attributes_structure_ ,
const std : : string & path_ ,
const size_t max_partitions_count_ ,
const size_t partition_size_ ,
const size_t block_size_ ,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_ ,
const size_t write_buffer_size_ )
2020-01-08 19:41:05 +00:00
: attributes_structure ( attributes_structure_ )
2020-01-04 15:04:16 +00:00
, path ( path_ )
2020-01-12 11:32:43 +00:00
, max_partitions_count ( max_partitions_count_ )
2020-01-12 14:23:32 +00:00
, partition_size ( partition_size_ )
, block_size ( block_size_ )
, read_buffer_size ( read_buffer_size_ )
2020-01-19 08:49:40 +00:00
, write_buffer_size ( write_buffer_size_ )
2020-01-04 15:04:16 +00:00
, log ( & Poco : : Logger : : get ( " CacheStorage " ) )
{
}
2020-01-12 12:29:42 +00:00
CacheStorage : : ~ CacheStorage ( )
{
std : : unique_lock lock ( rw_lock ) ;
partition_delete_queue . splice ( std : : end ( partition_delete_queue ) , partitions ) ;
collectGarbage ( ) ;
}
2020-02-01 18:13:43 +00:00
template < typename Out , typename GetDefault >
2020-01-11 20:23:51 +00:00
void CacheStorage : : getValue ( const size_t attribute_index , const PaddedPODArray < UInt64 > & ids ,
ResultArrayType < Out > & out , std : : unordered_map < Key , std : : vector < size_t > > & not_found ,
2020-02-01 18:13:43 +00:00
GetDefault & get_default , std : : chrono : : system_clock : : time_point now ) const
2020-01-11 20:23:51 +00:00
{
std : : vector < bool > found ( ids . size ( ) , false ) ;
2020-01-18 17:46:00 +00:00
{
std : : shared_lock lock ( rw_lock ) ;
for ( auto & partition : partitions )
2020-02-01 18:13:43 +00:00
partition - > getValue < Out > ( attribute_index , ids , out , found , get_default , now ) ;
2020-01-18 17:46:00 +00:00
}
2020-02-01 18:13:43 +00:00
for ( size_t i = 0 ; i < ids . size ( ) ; + + i )
if ( ! found [ i ] )
not_found [ ids [ i ] ] . push_back ( i ) ;
2020-01-18 17:46:00 +00:00
query_count . fetch_add ( ids . size ( ) , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( ids . size ( ) - not_found . size ( ) , std : : memory_order_release ) ;
2020-01-11 20:23:51 +00:00
}
2020-01-29 18:51:09 +00:00
void CacheStorage : : getString ( const size_t attribute_index , const PaddedPODArray < UInt64 > & ids ,
2020-02-01 10:12:35 +00:00
StringRefs & refs , ArenaWithFreeLists & arena , std : : unordered_map < Key , std : : vector < size_t > > & not_found ,
2020-02-01 18:13:43 +00:00
std : : vector < size_t > & default_ids , std : : chrono : : system_clock : : time_point now ) const
2020-01-29 18:51:09 +00:00
{
std : : vector < bool > found ( ids . size ( ) , false ) ;
{
std : : shared_lock lock ( rw_lock ) ;
for ( auto & partition : partitions )
2020-02-01 18:13:43 +00:00
partition - > getString ( attribute_index , ids , refs , arena , found , default_ids , now ) ;
2020-01-29 18:51:09 +00:00
}
2020-02-01 18:13:43 +00:00
for ( size_t i = 0 ; i < ids . size ( ) ; + + i )
if ( ! found [ i ] )
not_found [ ids [ i ] ] . push_back ( i ) ;
2020-01-29 18:51:09 +00:00
query_count . fetch_add ( ids . size ( ) , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( ids . size ( ) - not_found . size ( ) , std : : memory_order_release ) ;
}
2020-01-11 20:23:51 +00:00
void CacheStorage : : has ( const PaddedPODArray < UInt64 > & ids , ResultArrayType < UInt8 > & out ,
std : : unordered_map < Key , std : : vector < size_t > > & not_found , std : : chrono : : system_clock : : time_point now ) const
{
2020-01-18 17:46:00 +00:00
{
std : : shared_lock lock ( rw_lock ) ;
for ( auto & partition : partitions )
partition - > has ( ids , out , now ) ;
2020-01-11 20:23:51 +00:00
2020-01-18 17:46:00 +00:00
for ( size_t i = 0 ; i < ids . size ( ) ; + + i )
if ( out [ i ] = = HAS_NOT_FOUND )
not_found [ ids [ i ] ] . push_back ( i ) ;
}
query_count . fetch_add ( ids . size ( ) , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( ids . size ( ) - not_found . size ( ) , std : : memory_order_release ) ;
2020-01-11 20:23:51 +00:00
}
2020-01-02 19:33:19 +00:00
template < typename PresentIdHandler , typename AbsentIdHandler >
2020-01-04 15:04:16 +00:00
void CacheStorage : : update ( DictionarySourcePtr & source_ptr , const std : : vector < Key > & requested_ids ,
2020-01-08 19:41:05 +00:00
PresentIdHandler & & on_updated , AbsentIdHandler & & on_id_not_found ,
2020-02-01 18:13:43 +00:00
const DictionaryLifetime lifetime )
2020-01-02 19:33:19 +00:00
{
2020-01-11 16:38:43 +00:00
auto append_block = [ this ] ( const CachePartition : : Attribute & new_keys ,
const CachePartition : : Attributes & new_attributes , const PaddedPODArray < CachePartition : : Metadata > & metadata )
{
size_t inserted = 0 ;
while ( inserted < metadata . size ( ) )
{
if ( ! partitions . empty ( ) )
inserted + = partitions . front ( ) - > appendBlock ( new_keys , new_attributes , metadata , inserted ) ;
if ( inserted < metadata . size ( ) )
{
partitions . emplace_front ( std : : make_unique < CachePartition > (
AttributeUnderlyingType : : utUInt64 , attributes_structure , path ,
2020-01-12 14:23:32 +00:00
( partitions . empty ( ) ? 0 : partitions . front ( ) - > getId ( ) + 1 ) ,
2020-01-19 08:49:40 +00:00
partition_size , block_size , read_buffer_size , write_buffer_size ) ) ;
2020-01-11 16:38:43 +00:00
}
}
2020-01-12 11:32:43 +00:00
collectGarbage ( ) ;
2020-01-11 16:38:43 +00:00
} ;
2020-01-02 19:33:19 +00:00
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : DictCacheRequests } ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequested , requested_ids . size ( ) ) ;
std : : unordered_map < Key , UInt8 > remaining_ids { requested_ids . size ( ) } ;
for ( const auto id : requested_ids )
remaining_ids . insert ( { id , 0 } ) ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
{
2020-01-11 20:56:27 +00:00
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
if ( now > backoff_end_time )
2020-01-02 19:33:19 +00:00
{
2020-01-11 20:56:27 +00:00
try
2020-01-02 19:33:19 +00:00
{
2020-01-11 20:56:27 +00:00
if ( update_error_count )
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr - > clone ( ) ;
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
Stopwatch watch ;
auto stream = source_ptr - > loadIds ( requested_ids ) ;
stream - > readPrefix ( ) ;
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
while ( const auto block = stream - > read ( ) )
{
const auto new_keys = std : : move ( createAttributesFromBlock ( block , 0 , { AttributeUnderlyingType : : utUInt64 } ) . front ( ) ) ;
const auto new_attributes = createAttributesFromBlock ( block , 1 , attributes_structure ) ;
2020-01-06 20:38:32 +00:00
2020-01-11 20:56:27 +00:00
const auto & ids = std : : get < CachePartition : : Attribute : : Container < UInt64 > > ( new_keys . values ) ;
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
PaddedPODArray < CachePartition : : Metadata > metadata ( ids . size ( ) ) ;
2020-01-08 12:40:29 +00:00
2020-01-11 20:56:27 +00:00
for ( const auto i : ext : : range ( 0 , ids . size ( ) ) )
{
std : : uniform_int_distribution < UInt64 > distribution { lifetime . min_sec , lifetime . max_sec } ;
metadata [ i ] . setExpiresAt ( now + std : : chrono : : seconds ( distribution ( rnd_engine ) ) ) ;
/// mark corresponding id as found
on_updated ( ids [ i ] , i , new_attributes ) ;
remaining_ids [ ids [ i ] ] = 1 ;
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
append_block ( new_keys , new_attributes , metadata ) ;
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
stream - > readSuffix ( ) ;
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
update_error_count = 0 ;
last_update_exception = std : : exception_ptr { } ;
backoff_end_time = std : : chrono : : system_clock : : time_point { } ;
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequestTimeNs , watch . elapsed ( ) ) ;
}
catch ( . . . )
{
+ + update_error_count ;
last_update_exception = std : : current_exception ( ) ;
backoff_end_time = now + std : : chrono : : seconds ( calculateDurationWithBackoff ( rnd_engine , update_error_count ) ) ;
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
tryLogException ( last_update_exception , log ,
" Could not update ssd cache dictionary, next update is scheduled at " + ext : : to_string ( backoff_end_time ) ) ;
}
2020-01-02 19:33:19 +00:00
}
}
2020-02-01 18:13:43 +00:00
auto append_defaults = [ this ] ( const CachePartition : : Attribute & new_keys , const PaddedPODArray < CachePartition : : Metadata > & metadata )
2020-01-03 19:52:07 +00:00
{
2020-02-01 18:13:43 +00:00
size_t inserted = 0 ;
while ( inserted < metadata . size ( ) )
2020-01-03 19:52:07 +00:00
{
2020-02-01 18:13:43 +00:00
if ( ! partitions . empty ( ) )
inserted + = partitions . front ( ) - > appendDefaults ( new_keys , metadata , inserted ) ;
if ( inserted < metadata . size ( ) )
2020-01-03 19:52:07 +00:00
{
2020-02-01 18:13:43 +00:00
partitions . emplace_front ( std : : make_unique < CachePartition > (
AttributeUnderlyingType : : utUInt64 , attributes_structure , path ,
( partitions . empty ( ) ? 0 : partitions . front ( ) - > getId ( ) + 1 ) ,
partition_size , block_size , read_buffer_size , write_buffer_size ) ) ;
2020-01-03 19:52:07 +00:00
}
}
2020-02-01 18:13:43 +00:00
collectGarbage ( ) ;
} ;
size_t not_found_num = 0 , found_num = 0 ;
/// Check which ids have not been found and require setting null_value
CachePartition : : Attribute new_keys ;
new_keys . type = AttributeUnderlyingType : : utUInt64 ;
new_keys . values = CachePartition : : Attribute : : Container < UInt64 > ( ) ;
2020-01-08 12:40:29 +00:00
2020-01-10 19:19:03 +00:00
PaddedPODArray < CachePartition : : Metadata > metadata ;
2020-01-08 12:40:29 +00:00
2020-01-02 19:33:19 +00:00
for ( const auto & id_found_pair : remaining_ids )
{
if ( id_found_pair . second )
{
+ + found_num ;
continue ;
}
+ + not_found_num ;
const auto id = id_found_pair . first ;
if ( update_error_count )
{
/// TODO: юзать старые значения.
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std : : rethrow_exception ( last_update_exception ) ;
}
2020-01-05 13:59:49 +00:00
// Set key
2020-01-26 17:35:39 +00:00
std : : get < CachePartition : : Attribute : : Container < UInt64 > > ( new_keys . values ) . push_back ( id ) ;
2020-01-05 13:59:49 +00:00
2020-01-08 19:41:05 +00:00
std : : uniform_int_distribution < UInt64 > distribution { lifetime . min_sec , lifetime . max_sec } ;
2020-01-08 12:40:29 +00:00
metadata . emplace_back ( ) ;
metadata . back ( ) . setExpiresAt ( now + std : : chrono : : seconds ( distribution ( rnd_engine ) ) ) ;
metadata . back ( ) . setDefault ( ) ;
2020-01-02 19:33:19 +00:00
/// inform caller that the cell has not been found
on_id_not_found ( id ) ;
}
2020-01-08 12:40:29 +00:00
2020-01-11 20:56:27 +00:00
{
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
if ( not_found_num )
2020-02-01 18:13:43 +00:00
append_defaults ( new_keys , metadata ) ;
2020-01-11 20:56:27 +00:00
}
2020-01-02 19:33:19 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedMiss , not_found_num ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedFound , found_num ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequests ) ;
}
2020-01-18 11:47:58 +00:00
PaddedPODArray < CachePartition : : Key > CacheStorage : : getCachedIds ( ) const
{
PaddedPODArray < Key > array ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
std : : shared_lock lock ( rw_lock ) ;
for ( auto & partition : partitions )
{
const auto cached_in_partition = partition - > getCachedIds ( now ) ;
array . insert ( std : : begin ( cached_in_partition ) , std : : end ( cached_in_partition ) ) ;
}
return array ;
}
2020-01-18 17:46:00 +00:00
double CacheStorage : : getLoadFactor ( ) const
{
double result = 0 ;
std : : shared_lock lock ( rw_lock ) ;
for ( const auto & partition : partitions )
result + = partition - > getLoadFactor ( ) ;
return result / partitions . size ( ) ;
}
size_t CacheStorage : : getElementCount ( ) const
{
size_t result = 0 ;
std : : shared_lock lock ( rw_lock ) ;
for ( const auto & partition : partitions )
result + = partition - > getElementCount ( ) ;
return result ;
}
2020-01-12 11:32:43 +00:00
void CacheStorage : : collectGarbage ( )
{
// add partitions to queue
2020-01-12 12:29:42 +00:00
while ( partitions . size ( ) > max_partitions_count )
2020-01-12 11:32:43 +00:00
{
2020-01-12 12:29:42 +00:00
partition_delete_queue . splice ( std : : end ( partition_delete_queue ) , partitions , std : : prev ( std : : end ( partitions ) ) ) ;
2020-01-12 11:32:43 +00:00
}
// drop unused partitions
while ( ! partition_delete_queue . empty ( ) & & partition_delete_queue . front ( ) . use_count ( ) = = 1 )
{
partition_delete_queue . front ( ) - > remove ( ) ;
partition_delete_queue . pop_front ( ) ;
}
}
2020-01-05 17:05:49 +00:00
CachePartition : : Attributes CacheStorage : : createAttributesFromBlock (
2020-01-06 20:38:32 +00:00
const Block & block , const size_t begin_column , const std : : vector < AttributeUnderlyingType > & structure )
2020-01-03 19:52:07 +00:00
{
CachePartition : : Attributes attributes ;
const auto columns = block . getColumns ( ) ;
2020-01-05 17:05:49 +00:00
for ( size_t i = 0 ; i < structure . size ( ) ; + + i )
2020-01-03 19:52:07 +00:00
{
2020-01-06 20:38:32 +00:00
const auto & column = columns [ i + begin_column ] ;
2020-01-05 17:05:49 +00:00
switch ( structure [ i ] )
2020-01-03 19:52:07 +00:00
{
# define DISPATCH(TYPE) \
case AttributeUnderlyingType : : ut # # TYPE : \
{ \
2020-01-26 17:35:39 +00:00
CachePartition : : Attribute : : Container < TYPE > values ( column - > size ( ) ) ; \
2020-01-27 19:29:23 +00:00
memcpy ( & values [ 0 ] , column - > getRawData ( ) . data , sizeof ( TYPE ) * values . size ( ) ) ; \
2020-01-04 15:04:16 +00:00
attributes . emplace_back ( ) ; \
2020-01-05 17:05:49 +00:00
attributes . back ( ) . type = structure [ i ] ; \
2020-01-04 15:04:16 +00:00
attributes . back ( ) . values = std : : move ( values ) ; \
2020-01-03 19:52:07 +00:00
} \
break ;
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
case AttributeUnderlyingType : : utString :
2020-01-28 20:32:41 +00:00
{
2020-01-26 17:35:39 +00:00
attributes . emplace_back ( ) ;
CachePartition : : Attribute : : Container < String > values ( column - > size ( ) ) ;
for ( size_t j = 0 ; j < column - > size ( ) ; + + j )
{
const auto ref = column - > getDataAt ( j ) ;
values [ j ] . resize ( ref . size ) ;
memcpy ( values [ j ] . data ( ) , ref . data , ref . size ) ;
}
attributes . back ( ) . type = structure [ i ] ;
attributes . back ( ) . values = std : : move ( values ) ;
2020-01-28 20:32:41 +00:00
}
2020-01-03 19:52:07 +00:00
break ;
}
}
return attributes ;
}
2020-01-01 17:40:46 +00:00
SSDCacheDictionary : : SSDCacheDictionary (
const std : : string & name_ ,
const DictionaryStructure & dict_struct_ ,
DictionarySourcePtr source_ptr_ ,
const DictionaryLifetime dict_lifetime_ ,
2020-01-03 19:52:07 +00:00
const std : : string & path_ ,
2020-01-12 14:23:32 +00:00
const size_t max_partitions_count_ ,
const size_t partition_size_ ,
const size_t block_size_ ,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_ ,
const size_t write_buffer_size_ )
2020-01-01 17:40:46 +00:00
: name ( name_ )
, dict_struct ( dict_struct_ )
, source_ptr ( std : : move ( source_ptr_ ) )
, dict_lifetime ( dict_lifetime_ )
2020-01-03 19:52:07 +00:00
, path ( path_ )
2020-01-12 14:23:32 +00:00
, max_partitions_count ( max_partitions_count_ )
, partition_size ( partition_size_ )
, block_size ( block_size_ )
, read_buffer_size ( read_buffer_size_ )
2020-01-19 08:49:40 +00:00
, write_buffer_size ( write_buffer_size_ )
2020-01-08 19:41:05 +00:00
, storage ( ext : : map < std : : vector > ( dict_struct . attributes , [ ] ( const auto & attribute ) { return attribute . underlying_type ; } ) ,
2020-01-19 08:49:40 +00:00
path , max_partitions_count , partition_size , block_size , read_buffer_size , write_buffer_size )
2020-01-03 19:52:07 +00:00
, log ( & Poco : : Logger : : get ( " SSDCacheDictionary " ) )
2020-01-01 17:40:46 +00:00
{
if ( ! this - > source_ptr - > supportsSelectiveLoad ( ) )
throw Exception { name + " : source cannot be used with CacheDictionary " , ErrorCodes : : UNSUPPORTED_METHOD } ;
createAttributes ( ) ;
}
# define DECLARE(TYPE) \
void SSDCacheDictionary : : get # # TYPE ( \
const std : : string & attribute_name , const PaddedPODArray < Key > & ids , ResultArrayType < TYPE > & out ) const \
{ \
const auto index = getAttributeIndex ( attribute_name ) ; \
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : ut # # TYPE ) ; \
2020-01-08 19:41:05 +00:00
const auto null_value = std : : get < TYPE > ( null_values [ index ] ) ; \
2020-01-01 17:40:46 +00:00
getItemsNumberImpl < TYPE , TYPE > ( \
2020-01-05 13:59:49 +00:00
index , \
2020-01-04 15:04:16 +00:00
ids , \
out , \
[ & ] ( const size_t ) { return null_value ; } ) ; \
2020-01-01 17:40:46 +00:00
}
2020-01-05 13:59:49 +00:00
2020-01-01 17:40:46 +00:00
DECLARE ( UInt8 )
DECLARE ( UInt16 )
DECLARE ( UInt32 )
DECLARE ( UInt64 )
DECLARE ( UInt128 )
DECLARE ( Int8 )
DECLARE ( Int16 )
DECLARE ( Int32 )
DECLARE ( Int64 )
DECLARE ( Float32 )
DECLARE ( Float64 )
DECLARE ( Decimal32 )
DECLARE ( Decimal64 )
DECLARE ( Decimal128 )
# undef DECLARE
# define DECLARE(TYPE) \
void SSDCacheDictionary : : get # # TYPE ( \
const std : : string & attribute_name , \
const PaddedPODArray < Key > & ids , \
const PaddedPODArray < TYPE > & def , \
ResultArrayType < TYPE > & out ) const \
{ \
const auto index = getAttributeIndex ( attribute_name ) ; \
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : ut # # TYPE ) ; \
getItemsNumberImpl < TYPE , TYPE > ( \
2020-01-05 13:59:49 +00:00
index , \
2020-01-01 17:40:46 +00:00
ids , \
out , \
[ & ] ( const size_t row ) { return def [ row ] ; } ) ; \
}
DECLARE ( UInt8 )
DECLARE ( UInt16 )
DECLARE ( UInt32 )
DECLARE ( UInt64 )
DECLARE ( UInt128 )
DECLARE ( Int8 )
DECLARE ( Int16 )
DECLARE ( Int32 )
DECLARE ( Int64 )
DECLARE ( Float32 )
DECLARE ( Float64 )
DECLARE ( Decimal32 )
DECLARE ( Decimal64 )
DECLARE ( Decimal128 )
# undef DECLARE
# define DECLARE(TYPE) \
void SSDCacheDictionary : : get # # TYPE ( \
const std : : string & attribute_name , \
const PaddedPODArray < Key > & ids , \
const TYPE def , \
ResultArrayType < TYPE > & out ) const \
{ \
const auto index = getAttributeIndex ( attribute_name ) ; \
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : ut # # TYPE ) ; \
getItemsNumberImpl < TYPE , TYPE > ( \
2020-01-05 13:59:49 +00:00
index , \
2020-01-01 17:40:46 +00:00
ids , \
out , \
[ & ] ( const size_t ) { return def ; } ) ; \
}
DECLARE ( UInt8 )
DECLARE ( UInt16 )
DECLARE ( UInt32 )
DECLARE ( UInt64 )
DECLARE ( UInt128 )
DECLARE ( Int8 )
DECLARE ( Int16 )
DECLARE ( Int32 )
DECLARE ( Int64 )
DECLARE ( Float32 )
DECLARE ( Float64 )
DECLARE ( Decimal32 )
DECLARE ( Decimal64 )
DECLARE ( Decimal128 )
# undef DECLARE
template < typename AttributeType , typename OutputType , typename DefaultGetter >
void SSDCacheDictionary : : getItemsNumberImpl (
2020-01-05 13:59:49 +00:00
const size_t attribute_index , const PaddedPODArray < Key > & ids , ResultArrayType < OutputType > & out , DefaultGetter & & get_default ) const
2020-01-01 17:40:46 +00:00
{
2020-01-08 12:40:29 +00:00
const auto now = std : : chrono : : system_clock : : now ( ) ;
2020-01-01 17:40:46 +00:00
std : : unordered_map < Key , std : : vector < size_t > > not_found_ids ;
2020-02-01 18:13:43 +00:00
storage . getValue < OutputType > ( attribute_index , ids , out , not_found_ids , get_default , now ) ;
2020-01-01 17:40:46 +00:00
if ( not_found_ids . empty ( ) )
return ;
std : : vector < Key > required_ids ( not_found_ids . size ( ) ) ;
2020-01-05 13:59:49 +00:00
std : : transform ( std : : begin ( not_found_ids ) , std : : end ( not_found_ids ) , std : : begin ( required_ids ) , [ ] ( const auto & pair ) { return pair . first ; } ) ;
2020-01-01 17:40:46 +00:00
2020-01-02 19:33:19 +00:00
storage . update (
source_ptr ,
2020-01-01 17:40:46 +00:00
required_ids ,
2020-01-04 15:04:16 +00:00
[ & ] ( const auto id , const auto row , const auto & new_attributes ) {
2020-01-03 19:52:07 +00:00
for ( const size_t out_row : not_found_ids [ id ] )
2020-01-26 17:35:39 +00:00
out [ out_row ] = std : : get < CachePartition : : Attribute : : Container < OutputType > > ( new_attributes [ attribute_index ] . values ) [ row ] ;
2020-01-01 17:40:46 +00:00
} ,
2020-01-05 13:59:49 +00:00
[ & ] ( const size_t id )
2020-01-01 17:40:46 +00:00
{
for ( const size_t row : not_found_ids [ id ] )
out [ row ] = get_default ( row ) ;
2020-01-08 19:41:05 +00:00
} ,
2020-02-01 18:13:43 +00:00
getLifetime ( ) ) ;
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary : : getString ( const std : : string & attribute_name , const PaddedPODArray < Key > & ids , ColumnString * out ) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex ( attribute_name ) ;
2020-01-08 19:41:05 +00:00
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : utString ) ;
2020-01-01 17:40:46 +00:00
2020-01-08 19:41:05 +00:00
const auto null_value = StringRef { std : : get < String > ( null_values [ index ] ) } ;
2020-01-01 17:40:46 +00:00
2020-01-26 17:35:39 +00:00
getItemsStringImpl ( index , ids , out , [ & ] ( const size_t ) { return null_value ; } ) ;
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary : : getString (
const std : : string & attribute_name , const PaddedPODArray < Key > & ids , const ColumnString * const def , ColumnString * const out ) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex ( attribute_name ) ;
2020-01-08 19:41:05 +00:00
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : utString ) ;
2020-01-01 17:40:46 +00:00
2020-01-26 17:35:39 +00:00
getItemsStringImpl ( index , ids , out , [ & ] ( const size_t row ) { return def - > getDataAt ( row ) ; } ) ;
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary : : getString (
const std : : string & attribute_name , const PaddedPODArray < Key > & ids , const String & def , ColumnString * const out ) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex ( attribute_name ) ;
2020-01-08 19:41:05 +00:00
checkAttributeType ( name , attribute_name , dict_struct . attributes [ index ] . underlying_type , AttributeUnderlyingType : : utString ) ;
2020-01-01 17:40:46 +00:00
2020-01-26 17:35:39 +00:00
getItemsStringImpl ( index , ids , out , [ & ] ( const size_t ) { return StringRef { def } ; } ) ;
2020-01-01 17:40:46 +00:00
}
template < typename DefaultGetter >
2020-01-26 17:35:39 +00:00
void SSDCacheDictionary : : getItemsStringImpl ( const size_t attribute_index , const PaddedPODArray < Key > & ids ,
2020-01-01 17:40:46 +00:00
ColumnString * out , DefaultGetter & & get_default ) const
{
2020-01-26 17:35:39 +00:00
const auto now = std : : chrono : : system_clock : : now ( ) ;
std : : unordered_map < Key , std : : vector < size_t > > not_found_ids ;
2020-02-01 10:12:35 +00:00
StringRefs refs ( ids . size ( ) ) ;
ArenaWithFreeLists string_arena ;
2020-02-01 18:13:43 +00:00
std : : vector < size_t > default_rows ;
storage . getString ( attribute_index , ids , refs , string_arena , not_found_ids , default_rows , now ) ;
std : : sort ( std : : begin ( default_rows ) , std : : end ( default_rows ) ) ;
2020-01-26 17:35:39 +00:00
if ( not_found_ids . empty ( ) )
{
2020-02-01 18:13:43 +00:00
size_t default_index = 0 ;
2020-02-01 10:12:35 +00:00
for ( size_t row = 0 ; row < ids . size ( ) ; + + row )
2020-02-01 18:13:43 +00:00
{
if ( unlikely ( default_index ! = default_rows . size ( ) & & default_rows [ default_index ] = = row ) )
{
auto to_insert = get_default ( row ) ;
out - > insertData ( to_insert . data , to_insert . size ) ;
+ + default_index ;
}
else
out - > insertData ( refs [ row ] . data , refs [ row ] . size ) ;
}
2020-01-26 17:35:39 +00:00
return ;
}
std : : vector < Key > required_ids ( not_found_ids . size ( ) ) ;
std : : transform ( std : : begin ( not_found_ids ) , std : : end ( not_found_ids ) , std : : begin ( required_ids ) , [ ] ( const auto & pair ) { return pair . first ; } ) ;
std : : unordered_map < Key , String > update_result ;
storage . update (
source_ptr ,
required_ids ,
[ & ] ( const auto id , const auto row , const auto & new_attributes )
{
update_result [ id ] = std : : get < CachePartition : : Attribute : : Container < String > > ( new_attributes [ attribute_index ] . values ) [ row ] ;
} ,
[ & ] ( const size_t ) { } ,
2020-02-01 18:13:43 +00:00
getLifetime ( ) ) ;
2020-01-26 17:35:39 +00:00
2020-02-01 18:13:43 +00:00
size_t default_index = 0 ;
2020-01-26 17:35:39 +00:00
for ( size_t row = 0 ; row < ids . size ( ) ; + + row )
{
const auto & id = ids [ row ] ;
2020-02-01 18:13:43 +00:00
if ( unlikely ( default_index ! = default_rows . size ( ) & & default_rows [ default_index ] = = row ) )
{
auto to_insert = get_default ( row ) ;
out - > insertData ( to_insert . data , to_insert . size ) ;
+ + default_index ;
}
else if ( auto it = not_found_ids . find ( id ) ; it = = std : : end ( not_found_ids ) )
2020-01-26 17:35:39 +00:00
{
2020-02-01 10:12:35 +00:00
out - > insertData ( refs [ row ] . data , refs [ row ] . size ) ;
2020-01-26 17:35:39 +00:00
}
2020-02-01 18:13:43 +00:00
else if ( auto it_update = update_result . find ( id ) ; it_update ! = std : : end ( update_result ) )
{
out - > insertData ( it_update - > second . data ( ) , it_update - > second . size ( ) ) ;
}
2020-01-26 17:35:39 +00:00
else
{
2020-02-01 18:13:43 +00:00
auto to_insert = get_default ( row ) ;
out - > insertData ( to_insert . data , to_insert . size ) ;
2020-01-26 17:35:39 +00:00
}
2020-01-28 20:32:41 +00:00
}
2020-01-01 17:40:46 +00:00
}
2020-01-08 12:40:29 +00:00
void SSDCacheDictionary : : has ( const PaddedPODArray < Key > & ids , PaddedPODArray < UInt8 > & out ) const
{
const auto now = std : : chrono : : system_clock : : now ( ) ;
std : : unordered_map < Key , std : : vector < size_t > > not_found_ids ;
storage . has ( ids , out , not_found_ids , now ) ;
if ( not_found_ids . empty ( ) )
return ;
std : : vector < Key > required_ids ( not_found_ids . size ( ) ) ;
std : : transform ( std : : begin ( not_found_ids ) , std : : end ( not_found_ids ) , std : : begin ( required_ids ) , [ ] ( const auto & pair ) { return pair . first ; } ) ;
storage . update (
source_ptr ,
required_ids ,
[ & ] ( const auto id , const auto , const auto & ) {
for ( const size_t out_row : not_found_ids [ id ] )
out [ out_row ] = true ;
} ,
[ & ] ( const size_t id )
{
for ( const size_t row : not_found_ids [ id ] )
out [ row ] = false ;
2020-01-08 19:41:05 +00:00
} ,
2020-02-01 18:13:43 +00:00
getLifetime ( ) ) ;
2020-01-08 12:40:29 +00:00
}
2020-01-18 11:47:58 +00:00
BlockInputStreamPtr SSDCacheDictionary : : getBlockInputStream ( const Names & column_names , size_t max_block_size ) const
{
using BlockInputStreamType = DictionaryBlockInputStream < SSDCacheDictionary , Key > ;
return std : : make_shared < BlockInputStreamType > ( shared_from_this ( ) , max_block_size , storage . getCachedIds ( ) , column_names ) ;
}
2020-01-01 17:40:46 +00:00
size_t SSDCacheDictionary : : getAttributeIndex ( const std : : string & attr_name ) const
{
auto it = attribute_index_by_name . find ( attr_name ) ;
if ( it = = std : : end ( attribute_index_by_name ) )
throw Exception { " Attribute ` " + name + " ` does not exist. " , ErrorCodes : : BAD_ARGUMENTS } ;
return it - > second ;
}
template < typename T >
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary : : createAttributeNullValueWithTypeImpl ( const Field & null_value )
2020-01-01 17:40:46 +00:00
{
2020-01-08 19:41:05 +00:00
AttributeValueVariant var_null_value = static_cast < T > ( null_value . get < NearestFieldType < T > > ( ) ) ;
2020-01-01 17:40:46 +00:00
bytes_allocated + = sizeof ( T ) ;
2020-01-08 19:41:05 +00:00
return var_null_value ;
2020-01-01 17:40:46 +00:00
}
template < >
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary : : createAttributeNullValueWithTypeImpl < String > ( const Field & null_value )
2020-01-01 17:40:46 +00:00
{
2020-01-08 19:41:05 +00:00
AttributeValueVariant var_null_value = null_value . get < String > ( ) ;
2020-01-01 17:40:46 +00:00
bytes_allocated + = sizeof ( StringRef ) ;
2020-01-08 19:41:05 +00:00
return var_null_value ;
2020-01-01 17:40:46 +00:00
}
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary : : createAttributeNullValueWithType ( const AttributeUnderlyingType type , const Field & null_value )
2020-01-01 17:40:46 +00:00
{
switch ( type )
{
# define DISPATCH(TYPE) \
case AttributeUnderlyingType : : ut # # TYPE : \
2020-01-08 19:41:05 +00:00
return createAttributeNullValueWithTypeImpl < TYPE > ( null_value ) ;
2019-10-25 18:06:08 +00:00
2020-01-01 17:40:46 +00:00
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
DISPATCH ( String )
# undef DISPATCH
}
2020-01-04 15:04:16 +00:00
throw Exception { " Unknown attribute type: " + std : : to_string ( static_cast < int > ( type ) ) , ErrorCodes : : TYPE_MISMATCH } ;
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary : : createAttributes ( )
{
2020-01-08 19:41:05 +00:00
null_values . reserve ( dict_struct . attributes . size ( ) ) ;
2020-01-01 17:40:46 +00:00
for ( size_t i = 0 ; i < dict_struct . attributes . size ( ) ; + + i )
{
const auto & attribute = dict_struct . attributes [ i ] ;
attribute_index_by_name . emplace ( attribute . name , i ) ;
2020-01-08 19:41:05 +00:00
null_values . push_back ( createAttributeNullValueWithType ( attribute . underlying_type , attribute . null_value ) ) ;
2020-01-01 17:40:46 +00:00
if ( attribute . hierarchical )
throw Exception { name + " : hierarchical attributes not supported for dictionary of type " + getTypeName ( ) ,
ErrorCodes : : TYPE_MISMATCH } ;
}
}
2019-10-25 18:06:08 +00:00
2020-01-05 13:59:49 +00:00
void registerDictionarySSDCache ( DictionaryFactory & factory )
{
auto create_layout = [ = ] ( const std : : string & name ,
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
DictionarySourcePtr source_ptr ) - > DictionaryPtr
{
if ( dict_struct . key )
throw Exception { " 'key' is not supported for dictionary of layout 'cache' " , ErrorCodes : : UNSUPPORTED_METHOD } ;
if ( dict_struct . range_min | | dict_struct . range_max )
throw Exception { name
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
const auto & layout_prefix = config_prefix + " .layout " ;
2020-01-12 14:23:32 +00:00
const auto max_partitions_count = config . getInt ( layout_prefix + " .ssd.max_partitions_count " , DEFAULT_PARTITIONS_COUNT ) ;
if ( max_partitions_count < = 0 )
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have 0 (or less) max_partitions_count " , ErrorCodes : : BAD_ARGUMENTS } ;
const auto block_size = config . getInt ( layout_prefix + " .ssd.block_size " , DEFAULT_SSD_BLOCK_SIZE ) ;
if ( block_size < = 0 )
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have 0 (or less) block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
const auto partition_size = config . getInt64 ( layout_prefix + " .ssd.partition_size " , DEFAULT_FILE_SIZE ) ;
if ( partition_size < = 0 )
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have 0 (or less) partition_size " , ErrorCodes : : BAD_ARGUMENTS } ;
if ( partition_size % block_size ! = 0 )
throw Exception { name + " : partition_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
const auto read_buffer_size = config . getInt64 ( layout_prefix + " .ssd.read_buffer_size " , DEFAULT_READ_BUFFER_SIZE ) ;
if ( read_buffer_size < = 0 )
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have 0 (or less) read_buffer_size " , ErrorCodes : : BAD_ARGUMENTS } ;
if ( read_buffer_size % block_size ! = 0 )
throw Exception { name + " : read_buffer_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
2020-01-05 13:59:49 +00:00
2020-01-19 08:49:40 +00:00
const auto write_buffer_size = config . getInt64 ( layout_prefix + " .ssd.write_buffer_size " , DEFAULT_WRITE_BUFFER_SIZE ) ;
if ( write_buffer_size < = 0 )
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have 0 (or less) write_buffer_size " , ErrorCodes : : BAD_ARGUMENTS } ;
if ( write_buffer_size % block_size ! = 0 )
throw Exception { name + " : write_buffer_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
2020-01-05 13:59:49 +00:00
const auto path = config . getString ( layout_prefix + " .ssd.path " ) ;
if ( path . empty ( ) )
2020-01-12 14:23:32 +00:00
throw Exception { name + " : dictionary of layout 'ssdcache' cannot have empty path " ,
2020-01-05 13:59:49 +00:00
ErrorCodes : : BAD_ARGUMENTS } ;
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-01-12 14:23:32 +00:00
return std : : make_unique < SSDCacheDictionary > (
name , dict_struct , std : : move ( source_ptr ) , dict_lifetime , path ,
2020-01-19 08:49:40 +00:00
max_partitions_count , partition_size / block_size , block_size ,
read_buffer_size / block_size , write_buffer_size / block_size ) ;
2020-01-05 13:59:49 +00:00
} ;
factory . registerLayout ( " ssd " , create_layout , false ) ;
}
2019-10-25 18:06:08 +00:00
}