2018-11-28 11:37:12 +00:00
# include "CacheDictionary.h"
2017-04-27 17:16:24 +00:00
# include <memory>
2021-02-16 21:33:02 +00:00
# include <ext/range.h>
# include <ext/size.h>
# include <ext/map.h>
# include <ext/chrono_io.h>
# include <Core/Defines.h>
2017-04-01 09:19:00 +00:00
# include <Common/BitHelpers.h>
2018-12-10 15:25:45 +00:00
# include <Common/CurrentMetrics.h>
2017-04-01 09:19:00 +00:00
# include <Common/HashTable/Hash.h>
2021-02-16 21:33:02 +00:00
# include <Common/HashTable/HashSet.h>
2017-04-08 01:32:05 +00:00
# include <Common/ProfileEvents.h>
2018-12-10 15:25:45 +00:00
# include <Common/ProfilingScopedRWLock.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2020-02-06 12:18:19 +00:00
# include <Common/setThreadName.h>
2021-02-16 21:33:02 +00:00
# include <IO/WriteBufferFromOStream.h>
# include <Dictionaries/DictionaryBlockInputStream.h>
# include <Dictionaries/CacheDictionaryStorage.h>
# include <Dictionaries/SSDCacheDictionaryStorage.h>
# include <Dictionaries/DictionaryFactory.h>
2020-04-20 02:31:21 +00:00
2017-04-08 01:32:05 +00:00
namespace ProfileEvents
{
2018-12-10 15:25:45 +00:00
extern const Event DictCacheKeysRequested ;
extern const Event DictCacheKeysRequestedMiss ;
extern const Event DictCacheKeysRequestedFound ;
extern const Event DictCacheKeysExpired ;
extern const Event DictCacheKeysNotFound ;
extern const Event DictCacheKeysHit ;
extern const Event DictCacheRequestTimeNs ;
extern const Event DictCacheRequests ;
extern const Event DictCacheLockWriteNs ;
extern const Event DictCacheLockReadNs ;
2017-04-08 01:32:05 +00:00
}
namespace CurrentMetrics
{
2018-12-10 15:25:45 +00:00
extern const Metric DictCacheRequests ;
2017-04-08 01:32:05 +00:00
}
2016-06-07 21:07:44 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int CACHE_DICTIONARY_UPDATE_FAIL ;
2017-04-01 07:20:54 +00:00
extern const int TYPE_MISMATCH ;
extern const int BAD_ARGUMENTS ;
extern const int UNSUPPORTED_METHOD ;
2018-11-28 11:37:12 +00:00
extern const int TOO_SMALL_BUFFER_SIZE ;
2016-06-07 21:07:44 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
CacheDictionary < dictionary_key_type > : : CacheDictionary (
2020-07-14 18:46:29 +00:00
const StorageID & dict_id_ ,
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_ ,
DictionarySourcePtr source_ptr_ ,
2021-02-16 21:33:02 +00:00
CacheDictionaryStoragePtr cache_storage_ptr_ ,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_ ,
2020-02-06 12:18:19 +00:00
DictionaryLifetime dict_lifetime_ ,
2021-02-16 21:33:02 +00:00
bool allow_read_expired_keys_ )
2020-07-14 18:46:29 +00:00
: IDictionary ( dict_id_ )
2019-08-03 11:02:40 +00:00
, dict_struct ( dict_struct_ )
, source_ptr { std : : move ( source_ptr_ ) }
2021-02-16 21:33:02 +00:00
, cache_storage_ptr ( cache_storage_ptr_ )
, update_queue (
dict_id_ . getNameForLogs ( ) ,
update_queue_configuration_ ,
2021-03-01 22:23:14 +00:00
[ this ] ( CacheDictionaryUpdateUnitPtr < dictionary_key_type > unit_to_update )
2021-02-16 21:33:02 +00:00
{
update ( unit_to_update ) ;
} )
2019-08-03 11:02:40 +00:00
, dict_lifetime ( dict_lifetime_ )
2020-05-30 21:57:37 +00:00
, log ( & Poco : : Logger : : get ( " ExternalDictionaries " ) )
2021-02-16 21:33:02 +00:00
, allow_read_expired_keys ( allow_read_expired_keys_ )
2018-12-10 15:25:45 +00:00
, rnd_engine ( randomSeed ( ) )
2016-06-07 21:07:44 +00:00
{
2020-07-20 13:44:07 +00:00
if ( ! source_ptr - > supportsSelectiveLoad ( ) )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : source cannot be used with CacheDictionary " , ErrorCodes : : UNSUPPORTED_METHOD } ;
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
setupHierarchicalAttribute ( ) ;
2020-02-06 12:18:19 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
CacheDictionary < dictionary_key_type > : : ~ CacheDictionary ( )
2020-02-06 12:18:19 +00:00
{
2021-02-16 21:33:02 +00:00
update_queue . stopAndWait ( ) ;
}
template < DictionaryKeyType dictionary_key_type >
size_t CacheDictionary < dictionary_key_type > : : getElementCount ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return cache_storage_ptr - > getSize ( ) ;
2016-06-07 21:07:44 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
size_t CacheDictionary < dictionary_key_type > : : getBytesAllocated ( ) const
2020-08-13 10:45:06 +00:00
{
/// In case of existing string arena we check the size of it.
/// But the same appears in setAttributeValue() function, which is called from update() function
/// which in turn is called from another thread.
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
2021-02-16 21:33:02 +00:00
return cache_storage_ptr - > getBytesAllocated ( ) ;
}
2021-02-17 18:19:04 +00:00
template < DictionaryKeyType dictionary_key_type >
double CacheDictionary < dictionary_key_type > : : getLoadFactor ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return static_cast < double > ( cache_storage_ptr - > getSize ( ) ) / cache_storage_ptr - > getMaxSize ( ) ;
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
std : : exception_ptr CacheDictionary < dictionary_key_type > : : getLastException ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return last_exception ;
2020-08-13 10:45:06 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
const IDictionarySource * CacheDictionary < dictionary_key_type > : : getSource ( ) const
2020-08-13 10:45:06 +00:00
{
/// Mutex required here because of the getSourceAndUpdateIfNeeded() function
/// which is used from another thread.
std : : lock_guard lock ( source_mutex ) ;
return source_ptr . get ( ) ;
}
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2021-02-19 13:32:26 +00:00
void CacheDictionary < dictionary_key_type > : : toParent ( const PaddedPODArray < UInt64 > & ids [[maybe_unused]], PaddedPODArray<UInt64> & out [[maybe_unused]] ) const
2016-06-07 21:07:44 +00:00
{
2021-02-16 21:33:02 +00:00
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
{
/// Run update on requested keys before fetch from storage
const auto & attribute_name = hierarchical_attribute - > name ;
2021-03-03 23:19:48 +00:00
2021-02-16 21:33:02 +00:00
auto result_type = std : : make_shared < DataTypeUInt64 > ( ) ;
2021-03-03 23:19:48 +00:00
auto input_column = result_type - > createColumn ( ) ;
auto & input_column_typed = assert_cast < ColumnVector < UInt64 > & > ( * input_column ) ;
auto & data = input_column_typed . getData ( ) ;
data . insert ( ids . begin ( ) , ids . end ( ) ) ;
auto column = getColumn ( { attribute_name } , result_type , { std : : move ( input_column ) } , { result_type } , { nullptr } ) ;
const auto & result_column_typed = assert_cast < const ColumnVector < UInt64 > & > ( * column ) ;
const auto & result_data = result_column_typed . getData ( ) ;
out . assign ( result_data ) ;
2021-02-16 21:33:02 +00:00
}
else
2021-02-18 16:57:20 +00:00
throw Exception ( " Hierarchy is not supported for complex key CacheDictionary " , ErrorCodes : : UNSUPPORTED_METHOD ) ;
2016-06-07 21:07:44 +00:00
}
2017-03-25 23:42:04 +00:00
/// Allow to use single value in same way as array.
2021-02-16 21:33:02 +00:00
static inline UInt64 getAt ( const PaddedPODArray < UInt64 > & arr , const size_t idx )
2018-12-10 15:25:45 +00:00
{
return arr [ idx ] ;
}
2021-02-16 21:33:02 +00:00
static inline UInt64 getAt ( const UInt64 & value , const size_t )
2018-12-10 15:25:45 +00:00
{
return value ;
}
2017-03-25 23:42:04 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2017-03-25 23:42:04 +00:00
template < typename AncestorType >
2021-02-16 21:33:02 +00:00
void CacheDictionary < dictionary_key_type > : : isInImpl ( const PaddedPODArray < Key > & child_ids , const AncestorType & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
/// Transform all children to parents until ancestor id or null_value will be reached.
2018-08-10 04:02:56 +00:00
size_t out_size = out . size ( ) ;
2018-12-10 15:25:45 +00:00
memset ( out . data ( ) , 0xFF , out_size ) ; /// 0xFF means "not calculated"
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
const auto null_value = hierarchical_attribute - > null_value . get < UInt64 > ( ) ;
2017-04-01 07:20:54 +00:00
2019-02-26 14:52:55 +00:00
PaddedPODArray < Key > children ( out_size , 0 ) ;
2017-04-01 07:20:54 +00:00
PaddedPODArray < Key > parents ( child_ids . begin ( ) , child_ids . end ( ) ) ;
2020-04-20 02:31:21 +00:00
for ( size_t i = 0 ; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH ; + + i )
2017-04-01 07:20:54 +00:00
{
size_t out_idx = 0 ;
size_t parents_idx = 0 ;
size_t new_children_idx = 0 ;
2018-08-10 04:02:56 +00:00
while ( out_idx < out_size )
2017-04-01 07:20:54 +00:00
{
/// Already calculated
if ( out [ out_idx ] ! = 0xFF )
{
+ + out_idx ;
continue ;
}
/// No parent
if ( parents [ parents_idx ] = = null_value )
{
out [ out_idx ] = 0 ;
}
/// Found ancestor
else if ( parents [ parents_idx ] = = getAt ( ancestor_ids , parents_idx ) )
{
out [ out_idx ] = 1 ;
}
2017-08-10 03:22:43 +00:00
/// Loop detected
2017-08-04 16:59:50 +00:00
else if ( children [ new_children_idx ] = = parents [ parents_idx ] )
{
2017-08-07 19:02:30 +00:00
out [ out_idx ] = 1 ;
2017-08-04 16:59:50 +00:00
}
2017-08-10 03:22:43 +00:00
/// Found intermediate parent, add this value to search at next loop iteration
2017-04-01 07:20:54 +00:00
else
{
children [ new_children_idx ] = parents [ parents_idx ] ;
+ + new_children_idx ;
}
+ + out_idx ;
+ + parents_idx ;
}
if ( new_children_idx = = 0 )
break ;
/// Transform all children to its parents.
children . resize ( new_children_idx ) ;
parents . resize ( new_children_idx ) ;
toParent ( children , parents ) ;
}
2017-03-25 23:42:04 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
void CacheDictionary < dictionary_key_type > : : isInVectorVector (
const PaddedPODArray < UInt64 > & child_ids , const PaddedPODArray < UInt64 > & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2016-12-13 21:15:27 +00:00
{
2017-04-01 07:20:54 +00:00
isInImpl ( child_ids , ancestor_ids , out ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
void CacheDictionary < dictionary_key_type > : : isInVectorConstant ( const PaddedPODArray < UInt64 > & child_ids , const UInt64 ancestor_id , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
isInImpl ( child_ids , ancestor_id , out ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
void CacheDictionary < dictionary_key_type > : : isInConstantVector ( const UInt64 child_id , const PaddedPODArray < UInt64 > & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
/// Special case with single child value.
2017-03-25 23:42:04 +00:00
2021-02-16 21:33:02 +00:00
const auto null_value = hierarchical_attribute - > null_value . get < UInt64 > ( ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
PaddedPODArray < Key > child ( 1 , child_id ) ;
PaddedPODArray < Key > parent ( 1 ) ;
std : : vector < Key > ancestors ( 1 , child_id ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
/// Iteratively find all ancestors for child.
2020-04-20 02:31:21 +00:00
for ( size_t i = 0 ; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH ; + + i )
2017-04-01 07:20:54 +00:00
{
toParent ( child , parent ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
if ( parent [ 0 ] = = null_value )
break ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
child [ 0 ] = parent [ 0 ] ;
ancestors . push_back ( parent [ 0 ] ) ;
}
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
/// Assuming short hierarchy, so linear search is Ok.
2018-08-10 04:02:56 +00:00
for ( size_t i = 0 , out_size = out . size ( ) ; i < out_size ; + + i )
2017-04-01 07:20:54 +00:00
out [ i ] = std : : find ( ancestors . begin ( ) , ancestors . end ( ) , ancestor_ids [ i ] ) ! = ancestors . end ( ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
void CacheDictionary < dictionary_key_type > : : setupHierarchicalAttribute ( )
2016-06-07 21:07:44 +00:00
{
2021-02-17 18:19:04 +00:00
/// TODO: Move this to DictionaryStructure
2021-02-16 21:33:02 +00:00
for ( const auto & attribute : dict_struct . attributes )
2020-12-27 17:23:09 +00:00
{
2021-02-16 21:33:02 +00:00
if ( attribute . hierarchical )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
hierarchical_attribute = & attribute ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
if ( attribute . underlying_type ! = AttributeUnderlyingType : : utUInt64 )
throw Exception { full_name + " : hierarchical attribute must be UInt64. " , ErrorCodes : : TYPE_MISMATCH } ;
}
}
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
ColumnPtr CacheDictionary < dictionary_key_type > : : getColumn (
const std : : string & attribute_name ,
const DataTypePtr & result_type ,
const Columns & key_columns ,
const DataTypes & key_types ,
const ColumnPtr & default_values_column ) const
{
2021-02-26 15:56:41 +00:00
return getColumns ( { attribute_name } , { result_type } , key_columns , key_types , { default_values_column } ) . front ( ) ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
Columns CacheDictionary < dictionary_key_type > : : getColumns (
const Strings & attribute_names ,
const DataTypes & ,
const Columns & key_columns ,
2021-02-24 17:13:36 +00:00
const DataTypes & key_types ,
2021-02-16 21:33:02 +00:00
const Columns & default_values_columns ) const
2021-01-23 13:18:24 +00:00
{
2021-02-24 17:13:36 +00:00
if ( dictionary_key_type = = DictionaryKeyType : : complex )
dict_struct . validateKeyTypes ( key_types ) ;
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena ;
DictionaryKeysExtractor < dictionary_key_type > extractor ( key_columns , complex_keys_arena ) ;
2021-02-16 21:33:02 +00:00
auto & keys = extractor . getKeys ( ) ;
2021-02-24 17:13:36 +00:00
2021-02-16 21:33:02 +00:00
return getColumnsImpl ( attribute_names , key_columns , keys , default_values_columns ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
Columns CacheDictionary < dictionary_key_type > : : getColumnsImpl (
const Strings & attribute_names ,
2021-03-03 18:58:43 +00:00
const Columns & key_columns ,
2021-02-16 21:33:02 +00:00
const PaddedPODArray < KeyType > & keys ,
const Columns & default_values_columns ) const
{
2021-03-03 18:58:43 +00:00
DictionaryStorageFetchRequest request ( dict_struct , attribute_names , default_values_columns ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage ;
2021-01-23 13:18:24 +00:00
{
2021-02-17 11:48:06 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2021-01-23 13:18:24 +00:00
2021-02-27 16:04:32 +00:00
result_of_fetch_from_storage = cache_storage_ptr - > fetchColumnsForKeys ( keys , request ) ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
size_t found_keys_size = result_of_fetch_from_storage . found_keys_size ;
size_t expired_keys_size = result_of_fetch_from_storage . expired_keys_size ;
size_t not_found_keys_size = result_of_fetch_from_storage . not_found_keys_size ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , found_keys_size ) ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , expired_keys_size ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , not_found_keys_size ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
query_count . fetch_add ( keys . size ( ) ) ;
hit_count . fetch_add ( found_keys_size ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
MutableColumns & fetched_columns_from_storage = result_of_fetch_from_storage . fetched_columns ;
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_state_from_storage = result_of_fetch_from_storage . key_index_to_state ;
2021-01-23 13:18:24 +00:00
2021-02-17 18:19:04 +00:00
bool source_returns_fetched_columns_in_order_of_keys = cache_storage_ptr - > returnsFetchedColumnsInOrderOfRequestedKeys ( ) ;
2021-02-16 21:33:02 +00:00
if ( not_found_keys_size = = 0 & & expired_keys_size = = 0 )
{
/// All keys were found in storage
2021-02-17 18:19:04 +00:00
if ( source_returns_fetched_columns_in_order_of_keys )
return request . filterRequestedColumns ( fetched_columns_from_storage ) ;
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ) ;
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
}
2021-02-16 21:33:02 +00:00
}
2021-02-26 15:56:41 +00:00
2021-03-03 18:58:43 +00:00
size_t keys_to_update_size = not_found_keys_size + expired_keys_size ;
auto update_unit = std : : make_shared < CacheDictionaryUpdateUnit < dictionary_key_type > > ( key_columns , key_index_to_state_from_storage , request , keys_to_update_size ) ;
2021-02-26 15:56:41 +00:00
HashMap < KeyType , size_t > requested_keys_to_fetched_columns_during_update_index ;
MutableColumns fetched_columns_during_update = request . makeAttributesResultColumns ( ) ;
if ( not_found_keys_size = = 0 & & expired_keys_size > 0 & & allow_read_expired_keys )
2021-02-16 21:33:02 +00:00
{
/// Start async update only if allow read expired keys and all keys are found
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-17 18:19:04 +00:00
if ( source_returns_fetched_columns_in_order_of_keys )
return request . filterRequestedColumns ( fetched_columns_from_storage ) ;
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ) ;
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
}
2021-02-16 21:33:02 +00:00
}
else
{
/// Start sync update
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
update_queue . waitForCurrentUpdateFinish ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std : : move ( update_unit - > requested_keys_to_fetched_columns_during_update_index ) ;
fetched_columns_during_update = std : : move ( update_unit - > fetched_columns_during_update ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
MutableColumns aggregated_columns = aggregateColumns (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ,
2021-02-16 21:33:02 +00:00
fetched_columns_during_update ,
2021-03-03 18:58:43 +00:00
requested_keys_to_fetched_columns_during_update_index ) ;
2021-02-16 21:33:02 +00:00
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2021-03-01 22:23:14 +00:00
ColumnUInt8 : : Ptr CacheDictionary < dictionary_key_type > : : hasKeys ( const Columns & key_columns , const DataTypes & key_types ) const
2021-01-23 13:18:24 +00:00
{
2021-03-01 22:23:14 +00:00
if ( dictionary_key_type = = DictionaryKeyType : : complex )
dict_struct . validateKeyTypes ( key_types ) ;
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena ;
DictionaryKeysExtractor < dictionary_key_type > extractor ( key_columns , complex_keys_arena ) ;
2021-02-16 21:33:02 +00:00
const auto & keys = extractor . getKeys ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-18 16:57:20 +00:00
/// We make empty request just to fetch if keys exists
2021-03-03 18:58:43 +00:00
DictionaryStorageFetchRequest request ( dict_struct , { } , { } ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage ;
2021-01-23 13:18:24 +00:00
{
2021-02-17 11:48:06 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2021-01-23 13:18:24 +00:00
2021-03-02 13:07:17 +00:00
result_of_fetch_from_storage = cache_storage_ptr - > fetchColumnsForKeys ( keys , request ) ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
size_t found_keys_size = result_of_fetch_from_storage . found_keys_size ;
size_t expired_keys_size = result_of_fetch_from_storage . expired_keys_size ;
size_t not_found_keys_size = result_of_fetch_from_storage . not_found_keys_size ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , found_keys_size ) ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , expired_keys_size ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , not_found_keys_size ) ;
query_count . fetch_add ( keys . size ( ) ) ;
hit_count . fetch_add ( found_keys_size ) ;
2021-03-03 18:58:43 +00:00
size_t keys_to_update_size = expired_keys_size + not_found_keys_size ;
auto update_unit = std : : make_shared < CacheDictionaryUpdateUnit < dictionary_key_type > > ( key_columns , result_of_fetch_from_storage . key_index_to_state , request , keys_to_update_size ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
HashMap < KeyType , size_t > requested_keys_to_fetched_columns_during_update_index ;
2021-03-03 18:58:43 +00:00
bool allow_expired_keys_during_aggregation = false ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
if ( not_found_keys_size = = 0 & & expired_keys_size = = 0 )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// All keys were found in storage
2021-03-03 18:58:43 +00:00
if ( result_of_fetch_from_storage . default_keys_size = = 0 )
return ColumnUInt8 : : create ( keys . size ( ) , true ) ;
allow_expired_keys_during_aggregation = true ;
2021-02-16 21:33:02 +00:00
}
else if ( not_found_keys_size = = 0 & & expired_keys_size > 0 & & allow_read_expired_keys )
{
/// Start async update only if allow read expired keys and all keys are found
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
if ( result_of_fetch_from_storage . default_keys_size = = 0 )
return ColumnUInt8 : : create ( keys . size ( ) , true ) ;
allow_expired_keys_during_aggregation = true ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
else
2021-02-16 21:33:02 +00:00
{
/// Start sync update
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
update_queue . waitForCurrentUpdateFinish ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std : : move ( update_unit - > requested_keys_to_fetched_columns_during_update_index ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
auto result = ColumnUInt8 : : create ( keys . size ( ) , false ) ;
auto & data = result - > getData ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
{
auto key = keys [ key_index ] ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
bool valid_expired_key = allow_expired_keys_during_aggregation & & result_of_fetch_from_storage . key_index_to_state [ key_index ] . isExpired ( ) ;
if ( result_of_fetch_from_storage . key_index_to_state [ key_index ] . isFound ( ) | | valid_expired_key )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// Check if key was fetched from cache
2021-03-03 18:58:43 +00:00
data [ key_index ] = ! result_of_fetch_from_storage . key_index_to_state [ key_index ] . isDefault ( ) ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
if ( requested_keys_to_fetched_columns_during_update_index . has ( key ) )
2021-02-16 21:33:02 +00:00
{
/// Check if key was not in cache and was fetched during update
data [ key_index ] = true ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
return result ;
}
2021-02-17 18:19:04 +00:00
template < DictionaryKeyType dictionary_key_type >
MutableColumns CacheDictionary < dictionary_key_type > : : aggregateColumnsInOrderOfKeys (
const PaddedPODArray < KeyType > & keys ,
const DictionaryStorageFetchRequest & request ,
const MutableColumns & fetched_columns ,
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_state )
2021-02-17 18:19:04 +00:00
{
MutableColumns aggregated_columns = request . makeAttributesResultColumns ( ) ;
2021-02-16 21:33:02 +00:00
2021-02-17 18:19:04 +00:00
for ( size_t fetch_request_index = 0 ; fetch_request_index < request . attributesSize ( ) ; + + fetch_request_index )
{
if ( ! request . shouldFillResultColumnWithIndex ( fetch_request_index ) )
continue ;
2021-02-16 21:33:02 +00:00
2021-02-17 18:19:04 +00:00
const auto & aggregated_column = aggregated_columns [ fetch_request_index ] ;
const auto & fetched_column = fetched_columns [ fetch_request_index ] ;
2021-02-16 21:33:02 +00:00
2021-02-26 15:56:41 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
2021-02-17 18:19:04 +00:00
{
2021-02-26 15:56:41 +00:00
auto state = key_index_to_state [ key_index ] ;
2021-02-16 21:33:02 +00:00
2021-02-26 15:56:41 +00:00
if ( state . isNotFound ( ) )
2021-02-17 18:19:04 +00:00
continue ;
2021-03-03 18:58:43 +00:00
aggregated_column - > insertFrom ( * fetched_column , state . getFetchedColumnIndex ( ) ) ;
2021-02-17 18:19:04 +00:00
}
}
return aggregated_columns ;
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
MutableColumns CacheDictionary < dictionary_key_type > : : aggregateColumns (
const PaddedPODArray < KeyType > & keys ,
const DictionaryStorageFetchRequest & request ,
const MutableColumns & fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_fetched_columns_from_storage_result ,
2021-02-16 21:33:02 +00:00
const MutableColumns & fetched_columns_during_update ,
2021-03-03 18:58:43 +00:00
const HashMap < KeyType , size_t > & found_keys_to_fetched_columns_during_update_index )
2021-02-16 21:33:02 +00:00
{
MutableColumns aggregated_columns = request . makeAttributesResultColumns ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t fetch_request_index = 0 ; fetch_request_index < request . attributesSize ( ) ; + + fetch_request_index )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
if ( ! request . shouldFillResultColumnWithIndex ( fetch_request_index ) )
continue ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
const auto & aggregated_column = aggregated_columns [ fetch_request_index ] ;
const auto & fetched_column_from_storage = fetched_columns_from_storage [ fetch_request_index ] ;
const auto & fetched_column_during_update = fetched_columns_during_update [ fetch_request_index ] ;
2021-03-03 18:58:43 +00:00
const auto & default_value_provider = request . defaultValueProviderAtIndex ( fetch_request_index ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
{
auto key = keys [ key_index ] ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
auto key_state_from_storage = key_index_to_fetched_columns_from_storage_result [ key_index ] ;
if ( key_state_from_storage . isFound ( ) )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was fetched from cache
2021-03-03 18:58:43 +00:00
aggregated_column - > insertFrom ( * fetched_column_from_storage , key_state_from_storage . getFetchedColumnIndex ( ) ) ;
2021-02-16 21:33:02 +00:00
continue ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was not in cache and was fetched during update
const auto * find_iterator_in_fetch_during_update = found_keys_to_fetched_columns_during_update_index . find ( key ) ;
if ( find_iterator_in_fetch_during_update )
{
aggregated_column - > insertFrom ( * fetched_column_during_update , find_iterator_in_fetch_during_update - > getMapped ( ) ) ;
continue ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
/// Insert default value
aggregated_column - > insert ( default_value_provider . getDefaultValue ( key_index ) ) ;
2021-01-23 13:18:24 +00:00
}
}
2021-02-16 21:33:02 +00:00
return aggregated_columns ;
}
template < DictionaryKeyType dictionary_key_type >
BlockInputStreamPtr CacheDictionary < dictionary_key_type > : : getBlockInputStream ( const Names & column_names , size_t max_block_size ) const
{
using BlockInputStreamType = DictionaryBlockInputStream < Key > ;
2021-03-03 18:58:43 +00:00
std : : shared_ptr < BlockInputStreamType > stream ;
2021-01-23 13:18:24 +00:00
{
2021-03-03 18:58:43 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
stream = std : : make_shared < BlockInputStreamType > ( shared_from_this ( ) , max_block_size , cache_storage_ptr - > getCachedSimpleKeys ( ) , column_names ) ;
else
{
auto keys = cache_storage_ptr - > getCachedComplexKeys ( ) ;
stream = std : : make_shared < BlockInputStreamType > ( shared_from_this ( ) , max_block_size , keys , column_names ) ;
}
2021-01-23 13:18:24 +00:00
}
2021-03-03 18:58:43 +00:00
return stream ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2021-03-01 22:23:14 +00:00
void CacheDictionary < dictionary_key_type > : : update ( CacheDictionaryUpdateUnitPtr < dictionary_key_type > update_unit_ptr )
2021-02-16 21:33:02 +00:00
{
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : DictCacheRequests } ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
size_t found_num = 0 ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
DictionaryKeysExtractor < dictionary_key_type > requested_keys_extractor ( update_unit_ptr - > key_columns , update_unit_ptr - > complex_key_arena ) ;
const auto & requested_keys = requested_keys_extractor . getKeys ( ) ;
HashSet < KeyType > not_found_keys ;
2021-02-16 21:33:02 +00:00
std : : vector < UInt64 > requested_keys_vector ;
2021-03-03 18:58:43 +00:00
std : : vector < size_t > requested_complex_key_rows ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
auto & key_index_to_state_from_storage = update_unit_ptr - > key_index_to_state ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
for ( size_t i = 0 ; i < key_index_to_state_from_storage . size ( ) ; + + i )
2021-02-16 21:33:02 +00:00
{
2021-03-03 18:58:43 +00:00
if ( key_index_to_state_from_storage [ i ] . isExpired ( )
| | key_index_to_state_from_storage [ i ] . isNotFound ( ) )
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
requested_keys_vector . emplace_back ( requested_keys [ i ] ) ;
else
requested_complex_key_rows . emplace_back ( i ) ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
auto requested_key = requested_keys [ i ] ;
not_found_keys . insert ( requested_key ) ;
}
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
size_t requested_keys_size = 0 ;
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
requested_keys_size = requested_keys_vector . size ( ) ;
2021-02-16 21:33:02 +00:00
else
2021-03-03 18:58:43 +00:00
requested_keys_size = requested_complex_key_rows . size ( ) ;
2020-09-17 18:57:57 +00:00
2021-03-01 22:23:14 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequested , requested_keys_size ) ;
2021-02-16 21:33:02 +00:00
const auto & fetch_request = update_unit_ptr - > request ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
const auto now = std : : chrono : : system_clock : : now ( ) ;
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
if ( now > backoff_end_time . load ( ) )
2020-10-02 19:09:48 +00:00
{
2021-02-16 21:33:02 +00:00
try
{
auto current_source_ptr = getSourceAndUpdateIfNeeded ( ) ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
Stopwatch watch ;
BlockInputStreamPtr stream ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
stream = current_source_ptr - > loadIds ( requested_keys_vector ) ;
else
2021-03-03 18:58:43 +00:00
stream = current_source_ptr - > loadKeys ( update_unit_ptr - > key_columns , requested_complex_key_rows ) ;
2020-11-13 16:16:56 +00:00
2021-02-16 21:33:02 +00:00
stream - > readPrefix ( ) ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
size_t skip_keys_size_offset = dict_struct . getKeysSize ( ) ;
2021-03-03 18:58:43 +00:00
PaddedPODArray < KeyType > found_keys_in_source ;
Columns fetched_columns_during_update = fetch_request . makeAttributesResultColumnsNonMutable ( ) ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
while ( Block block = stream - > read ( ) )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
Columns key_columns ;
key_columns . reserve ( skip_keys_size_offset ) ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
auto block_columns = block . getColumns ( ) ;
2017-02-14 18:06:21 +00:00
2021-03-03 18:58:43 +00:00
/// Split into keys columns and attribute columns
2021-02-16 21:33:02 +00:00
for ( size_t i = 0 ; i < skip_keys_size_offset ; + + i )
{
key_columns . emplace_back ( * block_columns . begin ( ) ) ;
block_columns . erase ( block_columns . begin ( ) ) ;
}
2020-12-27 17:23:09 +00:00
2021-02-27 20:39:34 +00:00
DictionaryKeysExtractor < dictionary_key_type > keys_extractor ( key_columns , update_unit_ptr - > complex_key_arena ) ;
2021-02-16 21:33:02 +00:00
const auto & keys = keys_extractor . getKeys ( ) ;
2020-12-27 17:23:09 +00:00
2021-03-03 18:58:43 +00:00
for ( size_t index_of_attribute = 0 ; index_of_attribute < fetched_columns_during_update . size ( ) ; + + index_of_attribute )
2021-02-16 21:33:02 +00:00
{
2021-03-03 18:58:43 +00:00
auto & column_to_update = fetched_columns_during_update [ index_of_attribute ] ;
auto column = block . safeGetByPosition ( skip_keys_size_offset + index_of_attribute ) . column ;
column_to_update - > assumeMutable ( ) - > insertRangeFrom ( * column , 0 , keys . size ( ) ) ;
2021-02-16 21:33:02 +00:00
}
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t i = 0 ; i < keys . size ( ) ; + + i )
{
auto fetched_key_from_source = keys [ i ] ;
2021-03-03 18:58:43 +00:00
not_found_keys . erase ( fetched_key_from_source ) ;
update_unit_ptr - > requested_keys_to_fetched_columns_during_update_index [ fetched_key_from_source ] = found_num ;
found_keys_in_source . emplace_back ( fetched_key_from_source ) ;
+ + found_num ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
}
2021-02-18 16:57:20 +00:00
2021-03-03 18:58:43 +00:00
PaddedPODArray < KeyType > not_found_keys_in_source ;
not_found_keys_in_source . reserve ( not_found_keys . size ( ) ) ;
for ( auto & cell : not_found_keys )
not_found_keys_in_source . emplace_back ( cell . getKey ( ) ) ;
{
/// Lock for cache modification
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
cache_storage_ptr - > insertColumnsForKeys ( found_keys_in_source , fetched_columns_during_update ) ;
cache_storage_ptr - > insertDefaultKeys ( not_found_keys_in_source ) ;
2021-02-16 21:33:02 +00:00
}
2019-12-19 18:22:04 +00:00
2021-03-03 18:58:43 +00:00
auto & update_unit_ptr_mutable_columns = update_unit_ptr - > fetched_columns_during_update ;
for ( const auto & fetched_column : fetched_columns_during_update )
update_unit_ptr_mutable_columns . emplace_back ( fetched_column - > assumeMutable ( ) ) ;
2021-02-16 21:33:02 +00:00
stream - > readSuffix ( ) ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
error_count = 0 ;
last_exception = std : : exception_ptr { } ;
backoff_end_time = std : : chrono : : system_clock : : time_point { } ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequestTimeNs , watch . elapsed ( ) ) ;
}
catch ( . . . )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
/// Lock just for last_exception safety
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
+ + error_count ;
last_exception = std : : current_exception ( ) ;
backoff_end_time = now + std : : chrono : : seconds ( calculateDurationWithBackoff ( rnd_engine , error_count ) ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
tryLogException ( last_exception , log ,
" Could not update cache dictionary ' " + getDictionaryID ( ) . getNameForLogs ( ) +
" ', next update is scheduled at " + ext : : to_string ( backoff_end_time . load ( ) ) ) ;
try
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
std : : rethrow_exception ( last_exception ) ;
2020-11-13 16:16:56 +00:00
}
2021-02-16 21:33:02 +00:00
catch ( . . . )
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Update failed for dictionary {} : {} " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
getCurrentExceptionMessage ( true /*with stack trace*/ ,
true /*check embedded stack trace*/ ) ) ;
2020-11-13 16:16:56 +00:00
}
2017-04-01 07:20:54 +00:00
}
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedMiss , requested_keys_size - found_num ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedFound , found_num ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequests ) ;
}
else
{
/// Won't request source for keys
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Query contains keys that are not present in cache or expired. Could not update cache dictionary {} now, because nearest update is scheduled at {}. Try again later. " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
ext : : to_string ( backoff_end_time . load ( ) ) ) ;
2017-04-01 07:20:54 +00:00
}
2021-02-16 21:33:02 +00:00
}
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
template class CacheDictionary < DictionaryKeyType : : simple > ;
template class CacheDictionary < DictionaryKeyType : : complex > ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
namespace
{
2019-12-26 18:56:34 +00:00
2021-02-16 21:33:02 +00:00
CacheDictionaryStorageConfiguration parseCacheStorageConfiguration (
const std : : string & full_name ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & layout_prefix ,
const DictionaryLifetime & dict_lifetime ,
bool is_complex )
2020-02-06 12:18:19 +00:00
{
2021-02-16 21:33:02 +00:00
std : : string dictionary_type_prefix = is_complex ? " .complex_key_cache. " : " .cache. " ;
std : : string dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const size_t size = config . getUInt64 ( dictionary_configuration_prefix + " size_in_cells " ) ;
if ( size = = 0 )
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 0 cells " ,
ErrorCodes : : TOO_SMALL_BUFFER_SIZE } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const size_t strict_max_lifetime_seconds =
config . getUInt64 ( dictionary_configuration_prefix + " strict_max_lifetime_seconds " ,
static_cast < size_t > ( dict_lifetime . max_sec ) ) ;
2020-02-06 12:18:19 +00:00
2021-02-27 16:04:32 +00:00
size_t rounded_size = roundUpToPowerOfTwoOrZero ( size ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
CacheDictionaryStorageConfiguration storage_configuration {
2021-02-27 16:04:32 +00:00
rounded_size ,
2021-02-16 21:33:02 +00:00
strict_max_lifetime_seconds ,
dict_lifetime
} ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
return storage_configuration ;
2020-09-17 18:57:57 +00:00
}
2020-12-27 17:23:09 +00:00
2021-02-25 14:39:05 +00:00
# if defined(OS_LINUX) || defined(__FreeBSD__)
2021-02-16 21:33:02 +00:00
SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration (
const std : : string & full_name ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & layout_prefix ,
const DictionaryLifetime & dict_lifetime ,
bool is_complex )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
std : : string dictionary_type_prefix = is_complex ? " .complex_key_ssd_cache. " : " .ssd_cache. " ;
std : : string dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
const size_t strict_max_lifetime_seconds =
config . getUInt64 ( dictionary_configuration_prefix + " strict_max_lifetime_seconds " ,
static_cast < size_t > ( dict_lifetime . max_sec ) ) ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
static constexpr size_t DEFAULT_SSD_BLOCK_SIZE_BYTES = DEFAULT_AIO_FILE_BLOCK_SIZE ;
static constexpr size_t DEFAULT_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024ULL ;
static constexpr size_t DEFAULT_READ_BUFFER_SIZE_BYTES = 16 * DEFAULT_SSD_BLOCK_SIZE_BYTES ;
static constexpr size_t DEFAULT_WRITE_BUFFER_SIZE_BYTES = DEFAULT_SSD_BLOCK_SIZE_BYTES ;
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
static constexpr size_t DEFAULT_MAX_STORED_KEYS = 100000 ;
static constexpr size_t DEFAULT_PARTITIONS_COUNT = 16 ;
2017-04-01 07:20:54 +00:00
2021-02-24 17:13:36 +00:00
const size_t max_partitions_count = config . getInt64 ( dictionary_configuration_prefix + " ssd_cache.max_partitions_count " , DEFAULT_PARTITIONS_COUNT ) ;
2017-04-01 07:20:54 +00:00
2021-02-24 17:13:36 +00:00
const size_t block_size = config . getInt64 ( dictionary_configuration_prefix + " block_size " , DEFAULT_SSD_BLOCK_SIZE_BYTES ) ;
const size_t file_size = config . getInt64 ( dictionary_configuration_prefix + " file_size " , DEFAULT_FILE_SIZE_BYTES ) ;
if ( file_size % block_size ! = 0 )
2021-02-16 21:33:02 +00:00
throw Exception { full_name + " : file_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
2016-06-07 21:07:44 +00:00
2021-02-24 17:13:36 +00:00
const size_t read_buffer_size = config . getInt64 ( dictionary_configuration_prefix + " read_buffer_size " , DEFAULT_READ_BUFFER_SIZE_BYTES ) ;
if ( read_buffer_size % block_size ! = 0 )
2021-02-16 21:33:02 +00:00
throw Exception { full_name + " : read_buffer_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
2017-04-01 07:20:54 +00:00
2021-02-24 17:13:36 +00:00
const size_t write_buffer_size = config . getInt64 ( dictionary_configuration_prefix + " write_buffer_size " , DEFAULT_WRITE_BUFFER_SIZE_BYTES ) ;
if ( write_buffer_size % block_size ! = 0 )
2021-02-16 21:33:02 +00:00
throw Exception { full_name + " : write_buffer_size must be a multiple of block_size " , ErrorCodes : : BAD_ARGUMENTS } ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
auto directory_path = config . getString ( dictionary_configuration_prefix + " path " ) ;
if ( directory_path . empty ( ) )
throw Exception { full_name + " : dictionary of layout 'ssd_cache' cannot have empty path " ,
ErrorCodes : : BAD_ARGUMENTS } ;
if ( directory_path . at ( 0 ) ! = ' / ' )
directory_path = std : : filesystem : : path { config . getString ( " path " ) } . concat ( directory_path ) . string ( ) ;
const size_t max_stored_keys_in_partition = config . getInt64 ( dictionary_configuration_prefix + " max_stored_keys " , DEFAULT_MAX_STORED_KEYS ) ;
2021-02-28 13:30:03 +00:00
const size_t rounded_size = roundUpToPowerOfTwoOrZero ( max_stored_keys_in_partition ) ;
2021-02-16 21:33:02 +00:00
SSDCacheDictionaryStorageConfiguration configuration {
strict_max_lifetime_seconds ,
dict_lifetime ,
directory_path ,
max_partitions_count ,
2021-02-28 13:30:03 +00:00
rounded_size ,
2021-02-16 21:33:02 +00:00
block_size ,
2021-02-24 17:13:36 +00:00
file_size / block_size ,
read_buffer_size / block_size ,
write_buffer_size / block_size
2021-02-16 21:33:02 +00:00
} ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
return configuration ;
2017-04-01 07:20:54 +00:00
}
2016-06-07 21:07:44 +00:00
2021-02-25 14:39:05 +00:00
# endif
2021-02-16 21:33:02 +00:00
CacheDictionaryUpdateQueueConfiguration parseCacheDictionaryUpdateQueueConfiguration (
const std : : string & full_name ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & layout_prefix ,
bool is_complex )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
std : : string type = is_complex ? " complex_key_cache " : " cache " ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
const size_t max_update_queue_size =
config . getUInt64 ( layout_prefix + " .cache.max_update_queue_size " , 100000 ) ;
if ( max_update_queue_size = = 0 )
throw Exception { full_name + " : dictionary of layout' " + type + " 'cannot have empty update queue of size 0 " ,
ErrorCodes : : TOO_SMALL_BUFFER_SIZE } ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
const size_t update_queue_push_timeout_milliseconds =
config . getUInt64 ( layout_prefix + " .cache.update_queue_push_timeout_milliseconds " , 10 ) ;
if ( update_queue_push_timeout_milliseconds < 10 )
throw Exception { full_name + " : dictionary of layout' " + type + " 'have too little update_queue_push_timeout " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
const size_t query_wait_timeout_milliseconds =
config . getUInt64 ( layout_prefix + " .cache.query_wait_timeout_milliseconds " , 60000 ) ;
2017-04-27 17:16:24 +00:00
2021-02-16 21:33:02 +00:00
const size_t max_threads_for_updates =
config . getUInt64 ( layout_prefix + " .max_threads_for_updates " , 4 ) ;
if ( max_threads_for_updates = = 0 )
throw Exception { full_name + " : dictionary of layout' " + type + " 'cannot have zero threads for updates. " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-10-02 14:26:39 +00:00
2021-02-16 21:33:02 +00:00
CacheDictionaryUpdateQueueConfiguration update_queue_configuration {
max_update_queue_size ,
2021-02-17 18:19:04 +00:00
max_threads_for_updates ,
2021-02-16 21:33:02 +00:00
update_queue_push_timeout_milliseconds ,
2021-02-17 18:19:04 +00:00
query_wait_timeout_milliseconds } ;
2017-05-04 18:14:23 +00:00
2021-02-16 21:33:02 +00:00
return update_queue_configuration ;
2017-04-27 17:16:24 +00:00
}
2019-08-30 09:50:38 +00:00
}
2018-11-28 11:37:12 +00:00
void registerDictionaryCache ( DictionaryFactory & factory )
{
2021-02-16 21:33:02 +00:00
auto create_simple_cache_layout = [ = ] ( const std : : string & full_name ,
2018-12-10 15:25:45 +00:00
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
2018-12-10 15:50:58 +00:00
DictionarySourcePtr source_ptr ) - > DictionaryPtr
{
2018-11-28 11:37:12 +00:00
if ( dict_struct . key )
2020-02-06 12:18:19 +00:00
throw Exception { " 'key' is not supported for dictionary of layout 'cache' " ,
ErrorCodes : : UNSUPPORTED_METHOD } ;
2018-11-28 11:37:12 +00:00
if ( dict_struct . range_min | | dict_struct . range_max )
2019-12-25 23:12:12 +00:00
throw Exception { full_name
2018-12-10 15:25:45 +00:00
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2018-11-28 11:37:12 +00:00
const bool require_nonempty = config . getBool ( config_prefix + " .require_nonempty " , false ) ;
if ( require_nonempty )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 'require_nonempty' attribute set " ,
2018-12-10 15:25:45 +00:00
ErrorCodes : : BAD_ARGUMENTS } ;
2018-11-28 11:37:12 +00:00
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + " .layout " ;
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID : : fromDictionaryConfig ( config , config_prefix ) ;
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-02-06 12:18:19 +00:00
const bool allow_read_expired_keys =
config . getBool ( layout_prefix + " .cache.allow_read_expired_keys " , false ) ;
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseCacheStorageConfiguration ( full_name , config , layout_prefix , dict_lifetime , false ) ;
auto storage = std : : make_shared < CacheDictionaryStorage < DictionaryKeyType : : simple > > ( storage_configuration ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration ( full_name , config , layout_prefix , false ) ;
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
return std : : make_unique < CacheDictionary < DictionaryKeyType : : simple > > (
2020-07-14 19:19:17 +00:00
dict_id ,
2020-04-17 17:01:18 +00:00
dict_struct ,
std : : move ( source_ptr ) ,
2021-02-16 21:33:02 +00:00
storage ,
update_queue_configuration ,
2020-04-17 17:01:18 +00:00
dict_lifetime ,
2021-02-16 21:33:02 +00:00
allow_read_expired_keys ) ;
2018-11-28 11:37:12 +00:00
} ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
factory . registerLayout ( " cache " , create_simple_cache_layout , false ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto create_complex_key_cache_layout = [ = ] ( const std : : string & full_name ,
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
DictionarySourcePtr source_ptr ) - > DictionaryPtr
2020-04-17 17:01:18 +00:00
{
2021-02-16 21:33:02 +00:00
if ( dict_struct . id )
throw Exception { " 'id' is not supported for dictionary of layout 'complex_key_cache' " ,
ErrorCodes : : UNSUPPORTED_METHOD } ;
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
if ( dict_struct . range_min | | dict_struct . range_max )
throw Exception { full_name
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config . getBool ( config_prefix + " .require_nonempty " , false ) ;
if ( require_nonempty )
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 'require_nonempty' attribute set " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + " .layout " ;
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID : : fromDictionaryConfig ( config , config_prefix ) ;
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config . getBool ( layout_prefix + " .cache.allow_read_expired_keys " , false ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseCacheStorageConfiguration ( full_name , config , layout_prefix , dict_lifetime , true ) ;
auto storage = std : : make_shared < CacheDictionaryStorage < DictionaryKeyType : : complex > > ( storage_configuration ) ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration ( full_name , config , layout_prefix , true ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
return std : : make_unique < CacheDictionary < DictionaryKeyType : : complex > > (
dict_id ,
dict_struct ,
std : : move ( source_ptr ) ,
storage ,
update_queue_configuration ,
dict_lifetime ,
allow_read_expired_keys ) ;
} ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
factory . registerLayout ( " complex_key_cache " , create_complex_key_cache_layout , true ) ;
2020-02-06 12:18:19 +00:00
2021-02-21 12:50:55 +00:00
# if defined(OS_LINUX) || defined(__FreeBSD__)
2021-02-16 21:33:02 +00:00
auto create_simple_ssd_cache_layout = [ = ] ( const std : : string & full_name ,
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
DictionarySourcePtr source_ptr ) - > DictionaryPtr
{
if ( dict_struct . key )
throw Exception { " 'key' is not supported for dictionary of layout 'cache' " ,
ErrorCodes : : UNSUPPORTED_METHOD } ;
2020-02-28 12:34:39 +00:00
2021-02-16 21:33:02 +00:00
if ( dict_struct . range_min | | dict_struct . range_max )
throw Exception { full_name
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-02-28 12:34:39 +00:00
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config . getBool ( config_prefix + " .require_nonempty " , false ) ;
if ( require_nonempty )
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 'require_nonempty' attribute set " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + " .layout " ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID : : fromDictionaryConfig ( config , config_prefix ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config . getBool ( layout_prefix + " .cache.allow_read_expired_keys " , false ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseSSDCacheStorageConfiguration ( full_name , config , layout_prefix , dict_lifetime , false ) ;
auto storage = std : : make_shared < SSDCacheDictionaryStorage < DictionaryKeyType : : simple > > ( storage_configuration ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration ( full_name , config , layout_prefix , false ) ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
return std : : make_unique < CacheDictionary < DictionaryKeyType : : simple > > (
dict_id ,
dict_struct ,
std : : move ( source_ptr ) ,
storage ,
update_queue_configuration ,
dict_lifetime ,
allow_read_expired_keys ) ;
} ;
2020-09-24 15:53:14 +00:00
2021-02-16 21:33:02 +00:00
factory . registerLayout ( " ssd_cache " , create_simple_ssd_cache_layout , false ) ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
auto create_complex_key_ssd_cache_layout = [ = ] ( const std : : string & full_name ,
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
DictionarySourcePtr source_ptr ) - > DictionaryPtr
{
if ( dict_struct . id )
throw Exception { " 'id' is not supported for dictionary of layout 'complex_key_cache' " ,
ErrorCodes : : UNSUPPORTED_METHOD } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
if ( dict_struct . range_min | | dict_struct . range_max )
throw Exception { full_name
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config . getBool ( config_prefix + " .require_nonempty " , false ) ;
if ( require_nonempty )
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 'require_nonempty' attribute set " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + " .layout " ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID : : fromDictionaryConfig ( config , config_prefix ) ;
2020-07-20 12:34:29 +00:00
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-10-02 14:26:39 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config . getBool ( layout_prefix + " .cache.allow_read_expired_keys " , false ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseSSDCacheStorageConfiguration ( full_name , config , layout_prefix , dict_lifetime , true ) ;
auto storage = std : : make_shared < SSDCacheDictionaryStorage < DictionaryKeyType : : complex > > ( storage_configuration ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration ( full_name , config , layout_prefix , true ) ;
2020-10-02 15:47:07 +00:00
2021-02-16 21:33:02 +00:00
return std : : make_unique < CacheDictionary < DictionaryKeyType : : complex > > (
dict_id ,
dict_struct ,
std : : move ( source_ptr ) ,
storage ,
update_queue_configuration ,
dict_lifetime ,
allow_read_expired_keys ) ;
} ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
factory . registerLayout ( " complex_key_ssd_cache " , create_complex_key_ssd_cache_layout , true ) ;
2021-02-21 12:50:55 +00:00
# endif
2020-02-06 12:18:19 +00:00
}
2019-12-26 18:56:34 +00:00
2016-06-07 21:07:44 +00:00
}