2018-11-28 11:37:12 +00:00
# include "CacheDictionary.h"
2017-04-27 17:16:24 +00:00
# include <memory>
2021-02-16 21:33:02 +00:00
# include <ext/range.h>
# include <ext/size.h>
# include <ext/map.h>
# include <ext/chrono_io.h>
# include <Core/Defines.h>
2018-12-10 15:25:45 +00:00
# include <Common/CurrentMetrics.h>
2017-04-01 09:19:00 +00:00
# include <Common/HashTable/Hash.h>
2021-02-16 21:33:02 +00:00
# include <Common/HashTable/HashSet.h>
2017-04-08 01:32:05 +00:00
# include <Common/ProfileEvents.h>
2018-12-10 15:25:45 +00:00
# include <Common/ProfilingScopedRWLock.h>
2021-03-24 16:31:00 +00:00
2021-02-16 21:33:02 +00:00
# include <Dictionaries/DictionaryBlockInputStream.h>
2021-03-24 16:31:00 +00:00
# include <Dictionaries/HierarchyDictionariesUtils.h>
2020-04-20 02:31:21 +00:00
2017-04-08 01:32:05 +00:00
namespace ProfileEvents
{
2018-12-10 15:25:45 +00:00
extern const Event DictCacheKeysRequested ;
extern const Event DictCacheKeysRequestedMiss ;
extern const Event DictCacheKeysRequestedFound ;
extern const Event DictCacheKeysExpired ;
extern const Event DictCacheKeysNotFound ;
extern const Event DictCacheKeysHit ;
extern const Event DictCacheRequestTimeNs ;
extern const Event DictCacheRequests ;
extern const Event DictCacheLockWriteNs ;
extern const Event DictCacheLockReadNs ;
2017-04-08 01:32:05 +00:00
}
namespace CurrentMetrics
{
2018-12-10 15:25:45 +00:00
extern const Metric DictCacheRequests ;
2017-04-08 01:32:05 +00:00
}
2016-06-07 21:07:44 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int CACHE_DICTIONARY_UPDATE_FAIL ;
2017-04-01 07:20:54 +00:00
extern const int UNSUPPORTED_METHOD ;
2016-06-07 21:07:44 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
CacheDictionary < dictionary_key_type > : : CacheDictionary (
2020-07-14 18:46:29 +00:00
const StorageID & dict_id_ ,
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_ ,
DictionarySourcePtr source_ptr_ ,
2021-02-16 21:33:02 +00:00
CacheDictionaryStoragePtr cache_storage_ptr_ ,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_ ,
2020-02-06 12:18:19 +00:00
DictionaryLifetime dict_lifetime_ ,
2021-02-16 21:33:02 +00:00
bool allow_read_expired_keys_ )
2020-07-14 18:46:29 +00:00
: IDictionary ( dict_id_ )
2019-08-03 11:02:40 +00:00
, dict_struct ( dict_struct_ )
, source_ptr { std : : move ( source_ptr_ ) }
2021-02-16 21:33:02 +00:00
, cache_storage_ptr ( cache_storage_ptr_ )
, update_queue (
dict_id_ . getNameForLogs ( ) ,
update_queue_configuration_ ,
2021-03-01 22:23:14 +00:00
[ this ] ( CacheDictionaryUpdateUnitPtr < dictionary_key_type > unit_to_update )
2021-02-16 21:33:02 +00:00
{
update ( unit_to_update ) ;
} )
2019-08-03 11:02:40 +00:00
, dict_lifetime ( dict_lifetime_ )
2020-05-30 21:57:37 +00:00
, log ( & Poco : : Logger : : get ( " ExternalDictionaries " ) )
2021-02-16 21:33:02 +00:00
, allow_read_expired_keys ( allow_read_expired_keys_ )
2018-12-10 15:25:45 +00:00
, rnd_engine ( randomSeed ( ) )
2016-06-07 21:07:44 +00:00
{
2020-07-20 13:44:07 +00:00
if ( ! source_ptr - > supportsSelectiveLoad ( ) )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : source cannot be used with CacheDictionary " , ErrorCodes : : UNSUPPORTED_METHOD } ;
2020-02-06 12:18:19 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
CacheDictionary < dictionary_key_type > : : ~ CacheDictionary ( )
2020-02-06 12:18:19 +00:00
{
2021-02-16 21:33:02 +00:00
update_queue . stopAndWait ( ) ;
}
template < DictionaryKeyType dictionary_key_type >
size_t CacheDictionary < dictionary_key_type > : : getElementCount ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return cache_storage_ptr - > getSize ( ) ;
2016-06-07 21:07:44 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
size_t CacheDictionary < dictionary_key_type > : : getBytesAllocated ( ) const
2020-08-13 10:45:06 +00:00
{
/// In case of existing string arena we check the size of it.
/// But the same appears in setAttributeValue() function, which is called from update() function
/// which in turn is called from another thread.
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
2021-02-16 21:33:02 +00:00
return cache_storage_ptr - > getBytesAllocated ( ) ;
}
2021-02-17 18:19:04 +00:00
template < DictionaryKeyType dictionary_key_type >
double CacheDictionary < dictionary_key_type > : : getLoadFactor ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
2021-03-18 09:55:17 +00:00
return cache_storage_ptr - > getLoadFactor ( ) ;
2021-02-17 18:19:04 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
std : : exception_ptr CacheDictionary < dictionary_key_type > : : getLastException ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return last_exception ;
2020-08-13 10:45:06 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
const IDictionarySource * CacheDictionary < dictionary_key_type > : : getSource ( ) const
2020-08-13 10:45:06 +00:00
{
/// Mutex required here because of the getSourceAndUpdateIfNeeded() function
/// which is used from another thread.
std : : lock_guard lock ( source_mutex ) ;
return source_ptr . get ( ) ;
}
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
ColumnPtr CacheDictionary < dictionary_key_type > : : getColumn (
const std : : string & attribute_name ,
const DataTypePtr & result_type ,
const Columns & key_columns ,
const DataTypes & key_types ,
const ColumnPtr & default_values_column ) const
{
2021-02-26 15:56:41 +00:00
return getColumns ( { attribute_name } , { result_type } , key_columns , key_types , { default_values_column } ) . front ( ) ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
Columns CacheDictionary < dictionary_key_type > : : getColumns (
const Strings & attribute_names ,
const DataTypes & ,
const Columns & key_columns ,
2021-02-24 17:13:36 +00:00
const DataTypes & key_types ,
2021-02-16 21:33:02 +00:00
const Columns & default_values_columns ) const
2021-01-23 13:18:24 +00:00
{
2021-02-24 17:13:36 +00:00
if ( dictionary_key_type = = DictionaryKeyType : : complex )
dict_struct . validateKeyTypes ( key_types ) ;
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena ;
DictionaryKeysExtractor < dictionary_key_type > extractor ( key_columns , complex_keys_arena ) ;
2021-02-16 21:33:02 +00:00
auto & keys = extractor . getKeys ( ) ;
2021-02-24 17:13:36 +00:00
2021-02-16 21:33:02 +00:00
return getColumnsImpl ( attribute_names , key_columns , keys , default_values_columns ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
Columns CacheDictionary < dictionary_key_type > : : getColumnsImpl (
const Strings & attribute_names ,
2021-03-03 18:58:43 +00:00
const Columns & key_columns ,
2021-02-16 21:33:02 +00:00
const PaddedPODArray < KeyType > & keys ,
const Columns & default_values_columns ) const
{
2021-03-04 14:34:39 +00:00
/**
* Flow of getColumsImpl
* 1. Get fetch result from storage
* 2. If all keys are found in storage and not expired
* 2.1 . If storage returns fetched columns in order of keys then result is returned to client .
* 2.2 . If storage does not return fetched columns in order of keys then reorder
* result columns and return result to client .
* 3. If all keys are found in storage but some of them are expired and we allow to read expired keys
* start async request to source and perform actions from step 2 for result returned from storage .
* 4. If some keys are found and some are not , start sync update from source .
* 5. Aggregate columns returned from storage and source , if key is not found in storage and in source
* use default value .
*/
2021-03-03 18:58:43 +00:00
DictionaryStorageFetchRequest request ( dict_struct , attribute_names , default_values_columns ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage ;
2021-01-23 13:18:24 +00:00
{
2021-03-18 09:55:17 +00:00
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2021-02-27 16:04:32 +00:00
result_of_fetch_from_storage = cache_storage_ptr - > fetchColumnsForKeys ( keys , request ) ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
size_t found_keys_size = result_of_fetch_from_storage . found_keys_size ;
size_t expired_keys_size = result_of_fetch_from_storage . expired_keys_size ;
size_t not_found_keys_size = result_of_fetch_from_storage . not_found_keys_size ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , found_keys_size ) ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , expired_keys_size ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , not_found_keys_size ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
query_count . fetch_add ( keys . size ( ) ) ;
hit_count . fetch_add ( found_keys_size ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
MutableColumns & fetched_columns_from_storage = result_of_fetch_from_storage . fetched_columns ;
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_state_from_storage = result_of_fetch_from_storage . key_index_to_state ;
2021-01-23 13:18:24 +00:00
2021-02-17 18:19:04 +00:00
bool source_returns_fetched_columns_in_order_of_keys = cache_storage_ptr - > returnsFetchedColumnsInOrderOfRequestedKeys ( ) ;
2021-02-16 21:33:02 +00:00
if ( not_found_keys_size = = 0 & & expired_keys_size = = 0 )
{
/// All keys were found in storage
2021-02-17 18:19:04 +00:00
if ( source_returns_fetched_columns_in_order_of_keys )
return request . filterRequestedColumns ( fetched_columns_from_storage ) ;
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ) ;
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
}
2021-02-16 21:33:02 +00:00
}
2021-02-26 15:56:41 +00:00
2021-03-03 18:58:43 +00:00
size_t keys_to_update_size = not_found_keys_size + expired_keys_size ;
auto update_unit = std : : make_shared < CacheDictionaryUpdateUnit < dictionary_key_type > > ( key_columns , key_index_to_state_from_storage , request , keys_to_update_size ) ;
2021-02-26 15:56:41 +00:00
HashMap < KeyType , size_t > requested_keys_to_fetched_columns_during_update_index ;
MutableColumns fetched_columns_during_update = request . makeAttributesResultColumns ( ) ;
if ( not_found_keys_size = = 0 & & expired_keys_size > 0 & & allow_read_expired_keys )
2021-02-16 21:33:02 +00:00
{
/// Start async update only if allow read expired keys and all keys are found
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-17 18:19:04 +00:00
if ( source_returns_fetched_columns_in_order_of_keys )
return request . filterRequestedColumns ( fetched_columns_from_storage ) ;
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ) ;
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
}
2021-02-16 21:33:02 +00:00
}
else
{
/// Start sync update
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
update_queue . waitForCurrentUpdateFinish ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std : : move ( update_unit - > requested_keys_to_fetched_columns_during_update_index ) ;
fetched_columns_during_update = std : : move ( update_unit - > fetched_columns_during_update ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
MutableColumns aggregated_columns = aggregateColumns (
keys ,
request ,
fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
key_index_to_state_from_storage ,
2021-02-16 21:33:02 +00:00
fetched_columns_during_update ,
2021-03-03 18:58:43 +00:00
requested_keys_to_fetched_columns_during_update_index ) ;
2021-02-16 21:33:02 +00:00
2021-02-17 18:19:04 +00:00
return request . filterRequestedColumns ( aggregated_columns ) ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2021-03-01 22:23:14 +00:00
ColumnUInt8 : : Ptr CacheDictionary < dictionary_key_type > : : hasKeys ( const Columns & key_columns , const DataTypes & key_types ) const
2021-01-23 13:18:24 +00:00
{
2021-03-04 14:34:39 +00:00
/**
* Flow of hasKeys . It is similar to getColumns . But there is an important detail , if key is identified with default value in storage
* it means that in hasKeys result this key will be false .
*
* 1. Get fetch result from storage
* 2. If all keys are found in storage and not expired and there are no default keys return that we have all keys .
2021-03-05 10:45:25 +00:00
* Otherwise set allow_expired_keys_during_aggregation and go to step 5.
2021-03-04 14:34:39 +00:00
* 3. If all keys are found in storage and some of them are expired and allow_read_expired keys is true return that we have all keys .
2021-03-05 10:45:25 +00:00
* Otherwise set allow_expired_keys_during_aggregation and go to step 5.
2021-03-04 14:34:39 +00:00
* 4. If not all keys are found in storage start sync update from source .
* 5. Start aggregation of keys from source and storage .
* If we allow read expired keys from step 2 or 3 then count them as founded in storage .
* Check if key was found in storage not default for that key set true in result array .
* Check that key was fetched during update for that key set true in result array .
*/
2021-03-01 22:23:14 +00:00
if ( dictionary_key_type = = DictionaryKeyType : : complex )
dict_struct . validateKeyTypes ( key_types ) ;
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena ;
DictionaryKeysExtractor < dictionary_key_type > extractor ( key_columns , complex_keys_arena ) ;
2021-02-16 21:33:02 +00:00
const auto & keys = extractor . getKeys ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-18 16:57:20 +00:00
/// We make empty request just to fetch if keys exists
2021-03-03 18:58:43 +00:00
DictionaryStorageFetchRequest request ( dict_struct , { } , { } ) ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage ;
2021-01-23 13:18:24 +00:00
{
2021-02-17 11:48:06 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2021-01-23 13:18:24 +00:00
2021-03-02 13:07:17 +00:00
result_of_fetch_from_storage = cache_storage_ptr - > fetchColumnsForKeys ( keys , request ) ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
size_t found_keys_size = result_of_fetch_from_storage . found_keys_size ;
size_t expired_keys_size = result_of_fetch_from_storage . expired_keys_size ;
size_t not_found_keys_size = result_of_fetch_from_storage . not_found_keys_size ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , found_keys_size ) ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , expired_keys_size ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , not_found_keys_size ) ;
query_count . fetch_add ( keys . size ( ) ) ;
hit_count . fetch_add ( found_keys_size ) ;
2021-03-03 18:58:43 +00:00
size_t keys_to_update_size = expired_keys_size + not_found_keys_size ;
auto update_unit = std : : make_shared < CacheDictionaryUpdateUnit < dictionary_key_type > > ( key_columns , result_of_fetch_from_storage . key_index_to_state , request , keys_to_update_size ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
HashMap < KeyType , size_t > requested_keys_to_fetched_columns_during_update_index ;
2021-03-03 18:58:43 +00:00
bool allow_expired_keys_during_aggregation = false ;
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
if ( not_found_keys_size = = 0 & & expired_keys_size = = 0 )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// All keys were found in storage
2021-03-03 18:58:43 +00:00
if ( result_of_fetch_from_storage . default_keys_size = = 0 )
return ColumnUInt8 : : create ( keys . size ( ) , true ) ;
allow_expired_keys_during_aggregation = true ;
2021-02-16 21:33:02 +00:00
}
else if ( not_found_keys_size = = 0 & & expired_keys_size > 0 & & allow_read_expired_keys )
{
/// Start async update only if allow read expired keys and all keys are found
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
if ( result_of_fetch_from_storage . default_keys_size = = 0 )
return ColumnUInt8 : : create ( keys . size ( ) , true ) ;
allow_expired_keys_during_aggregation = true ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
else
2021-02-16 21:33:02 +00:00
{
/// Start sync update
update_queue . tryPushToUpdateQueueOrThrow ( update_unit ) ;
update_queue . waitForCurrentUpdateFinish ( update_unit ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std : : move ( update_unit - > requested_keys_to_fetched_columns_during_update_index ) ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
auto result = ColumnUInt8 : : create ( keys . size ( ) , false ) ;
auto & data = result - > getData ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
{
auto key = keys [ key_index ] ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
bool valid_expired_key = allow_expired_keys_during_aggregation & & result_of_fetch_from_storage . key_index_to_state [ key_index ] . isExpired ( ) ;
if ( result_of_fetch_from_storage . key_index_to_state [ key_index ] . isFound ( ) | | valid_expired_key )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// Check if key was fetched from cache
2021-03-03 18:58:43 +00:00
data [ key_index ] = ! result_of_fetch_from_storage . key_index_to_state [ key_index ] . isDefault ( ) ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
if ( requested_keys_to_fetched_columns_during_update_index . has ( key ) )
2021-02-16 21:33:02 +00:00
{
/// Check if key was not in cache and was fetched during update
data [ key_index ] = true ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
2020-12-27 17:23:09 +00:00
2021-02-16 21:33:02 +00:00
return result ;
}
2021-03-24 16:31:00 +00:00
template < DictionaryKeyType dictionary_key_type >
ColumnPtr CacheDictionary < dictionary_key_type > : : getHierarchy ( ColumnPtr key_column , const DataTypePtr & key_type ) const
{
if ( dictionary_key_type = = DictionaryKeyType : : simple )
{
auto result = getHierarchyDefaultImplementation ( this , key_column , key_type ) ;
query_count . fetch_add ( key_column - > size ( ) , std : : memory_order_relaxed ) ;
return result ;
}
else
return nullptr ;
}
template < DictionaryKeyType dictionary_key_type >
ColumnUInt8 : : Ptr CacheDictionary < dictionary_key_type > : : isInHierarchy ( ColumnPtr key_column , ColumnPtr in_key_column , const DataTypePtr & key_type ) const
{
if ( dictionary_key_type = = DictionaryKeyType : : simple )
{
auto result = isInHierarchyDefaultImplementation ( this , key_column , in_key_column , key_type ) ;
query_count . fetch_add ( key_column - > size ( ) , std : : memory_order_relaxed ) ;
return result ;
}
else
return nullptr ;
}
2021-02-17 18:19:04 +00:00
template < DictionaryKeyType dictionary_key_type >
MutableColumns CacheDictionary < dictionary_key_type > : : aggregateColumnsInOrderOfKeys (
const PaddedPODArray < KeyType > & keys ,
const DictionaryStorageFetchRequest & request ,
const MutableColumns & fetched_columns ,
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_state )
2021-02-17 18:19:04 +00:00
{
MutableColumns aggregated_columns = request . makeAttributesResultColumns ( ) ;
2021-02-16 21:33:02 +00:00
2021-03-04 14:34:39 +00:00
/// If keys were returned not in order of keys, aggregate fetched columns in order of requested keys.
2021-02-17 18:19:04 +00:00
for ( size_t fetch_request_index = 0 ; fetch_request_index < request . attributesSize ( ) ; + + fetch_request_index )
{
if ( ! request . shouldFillResultColumnWithIndex ( fetch_request_index ) )
continue ;
2021-02-16 21:33:02 +00:00
2021-02-17 18:19:04 +00:00
const auto & aggregated_column = aggregated_columns [ fetch_request_index ] ;
const auto & fetched_column = fetched_columns [ fetch_request_index ] ;
2021-02-16 21:33:02 +00:00
2021-02-26 15:56:41 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
2021-02-17 18:19:04 +00:00
{
2021-02-26 15:56:41 +00:00
auto state = key_index_to_state [ key_index ] ;
2021-02-16 21:33:02 +00:00
2021-02-26 15:56:41 +00:00
if ( state . isNotFound ( ) )
2021-02-17 18:19:04 +00:00
continue ;
2021-03-03 18:58:43 +00:00
aggregated_column - > insertFrom ( * fetched_column , state . getFetchedColumnIndex ( ) ) ;
2021-02-17 18:19:04 +00:00
}
}
return aggregated_columns ;
}
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
MutableColumns CacheDictionary < dictionary_key_type > : : aggregateColumns (
const PaddedPODArray < KeyType > & keys ,
const DictionaryStorageFetchRequest & request ,
const MutableColumns & fetched_columns_from_storage ,
2021-02-26 15:56:41 +00:00
const PaddedPODArray < KeyState > & key_index_to_fetched_columns_from_storage_result ,
2021-02-16 21:33:02 +00:00
const MutableColumns & fetched_columns_during_update ,
2021-03-03 18:58:43 +00:00
const HashMap < KeyType , size_t > & found_keys_to_fetched_columns_during_update_index )
2021-02-16 21:33:02 +00:00
{
2021-03-04 14:34:39 +00:00
/**
* Aggregation of columns fetched from storage and from source during update .
* If key was found in storage add it to result .
* If key was found in source during update add it to result .
* If key was not found in storage or in source during update add default value .
*/
2021-02-16 21:33:02 +00:00
MutableColumns aggregated_columns = request . makeAttributesResultColumns ( ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t fetch_request_index = 0 ; fetch_request_index < request . attributesSize ( ) ; + + fetch_request_index )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
if ( ! request . shouldFillResultColumnWithIndex ( fetch_request_index ) )
continue ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
const auto & aggregated_column = aggregated_columns [ fetch_request_index ] ;
const auto & fetched_column_from_storage = fetched_columns_from_storage [ fetch_request_index ] ;
const auto & fetched_column_during_update = fetched_columns_during_update [ fetch_request_index ] ;
2021-03-03 18:58:43 +00:00
const auto & default_value_provider = request . defaultValueProviderAtIndex ( fetch_request_index ) ;
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
for ( size_t key_index = 0 ; key_index < keys . size ( ) ; + + key_index )
{
auto key = keys [ key_index ] ;
2021-01-23 13:18:24 +00:00
2021-02-26 15:56:41 +00:00
auto key_state_from_storage = key_index_to_fetched_columns_from_storage_result [ key_index ] ;
if ( key_state_from_storage . isFound ( ) )
2021-01-23 13:18:24 +00:00
{
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was fetched from cache
2021-03-03 18:58:43 +00:00
aggregated_column - > insertFrom ( * fetched_column_from_storage , key_state_from_storage . getFetchedColumnIndex ( ) ) ;
2021-02-16 21:33:02 +00:00
continue ;
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was not in cache and was fetched during update
const auto * find_iterator_in_fetch_during_update = found_keys_to_fetched_columns_during_update_index . find ( key ) ;
if ( find_iterator_in_fetch_during_update )
{
aggregated_column - > insertFrom ( * fetched_column_during_update , find_iterator_in_fetch_during_update - > getMapped ( ) ) ;
continue ;
2021-01-23 13:18:24 +00:00
}
2021-02-16 21:33:02 +00:00
/// Insert default value
aggregated_column - > insert ( default_value_provider . getDefaultValue ( key_index ) ) ;
2021-01-23 13:18:24 +00:00
}
}
2021-02-16 21:33:02 +00:00
return aggregated_columns ;
}
template < DictionaryKeyType dictionary_key_type >
BlockInputStreamPtr CacheDictionary < dictionary_key_type > : : getBlockInputStream ( const Names & column_names , size_t max_block_size ) const
{
2021-03-24 16:31:00 +00:00
std : : shared_ptr < DictionaryBlockInputStream > stream ;
2021-01-23 13:18:24 +00:00
{
2021-03-03 18:58:43 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
2021-03-24 16:31:00 +00:00
stream = std : : make_shared < DictionaryBlockInputStream > ( shared_from_this ( ) , max_block_size , cache_storage_ptr - > getCachedSimpleKeys ( ) , column_names ) ;
2021-03-03 18:58:43 +00:00
else
{
auto keys = cache_storage_ptr - > getCachedComplexKeys ( ) ;
2021-03-24 16:31:00 +00:00
stream = std : : make_shared < DictionaryBlockInputStream > ( shared_from_this ( ) , max_block_size , keys , column_names ) ;
2021-03-03 18:58:43 +00:00
}
2021-01-23 13:18:24 +00:00
}
2021-03-03 18:58:43 +00:00
return stream ;
2021-02-16 21:33:02 +00:00
}
2021-01-23 13:18:24 +00:00
2021-02-16 21:33:02 +00:00
template < DictionaryKeyType dictionary_key_type >
2021-03-01 22:23:14 +00:00
void CacheDictionary < dictionary_key_type > : : update ( CacheDictionaryUpdateUnitPtr < dictionary_key_type > update_unit_ptr )
2021-02-16 21:33:02 +00:00
{
2021-03-04 14:34:39 +00:00
/**
* Update has following flow .
* 1. Filter only necessary keys to request , keys that are expired or not found .
* And create not_found_keys hash_set including each requested key .
* In case of simple_keys we need to fill requested_keys_vector with requested value key .
* In case of complex_keys we need to fill requested_complex_key_rows with requested row .
* 2. Create stream from source with necessary keys to request using method for simple or complex keys .
* 3. Create fetched columns during update variable . This columns will aggregate columns that we fetch from source .
* 4. When block is fetched from source . Split it into keys columns and attributes columns .
* Insert attributes columns into associated fetched columns during update .
* Create KeysExtractor and extract keys from keys columns .
* Update map of requested found key to fetched column index .
* Remove found key from not_found_keys .
* 5. Add aggregated columns during update into storage .
* 6. Add not found keys as default into storage .
*/
2021-02-16 21:33:02 +00:00
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : DictCacheRequests } ;
2021-01-23 13:18:24 +00:00
2021-03-04 14:34:39 +00:00
size_t found_keys_size = 0 ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
DictionaryKeysExtractor < dictionary_key_type > requested_keys_extractor ( update_unit_ptr - > key_columns , update_unit_ptr - > complex_key_arena ) ;
const auto & requested_keys = requested_keys_extractor . getKeys ( ) ;
HashSet < KeyType > not_found_keys ;
2021-02-16 21:33:02 +00:00
std : : vector < UInt64 > requested_keys_vector ;
2021-03-03 18:58:43 +00:00
std : : vector < size_t > requested_complex_key_rows ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
auto & key_index_to_state_from_storage = update_unit_ptr - > key_index_to_state ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
for ( size_t i = 0 ; i < key_index_to_state_from_storage . size ( ) ; + + i )
2021-02-16 21:33:02 +00:00
{
2021-03-03 18:58:43 +00:00
if ( key_index_to_state_from_storage [ i ] . isExpired ( )
| | key_index_to_state_from_storage [ i ] . isNotFound ( ) )
{
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
requested_keys_vector . emplace_back ( requested_keys [ i ] ) ;
else
requested_complex_key_rows . emplace_back ( i ) ;
2021-01-23 13:18:24 +00:00
2021-03-03 18:58:43 +00:00
auto requested_key = requested_keys [ i ] ;
not_found_keys . insert ( requested_key ) ;
}
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
2021-03-04 14:34:39 +00:00
size_t requested_keys_size = update_unit_ptr - > keys_to_update_size ;
2021-03-01 22:23:14 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequested , requested_keys_size ) ;
2021-02-16 21:33:02 +00:00
const auto & fetch_request = update_unit_ptr - > request ;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
const auto now = std : : chrono : : system_clock : : now ( ) ;
2016-06-07 21:07:44 +00:00
2021-02-16 21:33:02 +00:00
if ( now > backoff_end_time . load ( ) )
2020-10-02 19:09:48 +00:00
{
2021-02-16 21:33:02 +00:00
try
{
auto current_source_ptr = getSourceAndUpdateIfNeeded ( ) ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
Stopwatch watch ;
BlockInputStreamPtr stream ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
if constexpr ( dictionary_key_type = = DictionaryKeyType : : simple )
stream = current_source_ptr - > loadIds ( requested_keys_vector ) ;
else
2021-03-03 18:58:43 +00:00
stream = current_source_ptr - > loadKeys ( update_unit_ptr - > key_columns , requested_complex_key_rows ) ;
2020-11-13 16:16:56 +00:00
2021-02-16 21:33:02 +00:00
stream - > readPrefix ( ) ;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
size_t skip_keys_size_offset = dict_struct . getKeysSize ( ) ;
2021-03-03 18:58:43 +00:00
PaddedPODArray < KeyType > found_keys_in_source ;
Columns fetched_columns_during_update = fetch_request . makeAttributesResultColumnsNonMutable ( ) ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
while ( Block block = stream - > read ( ) )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
Columns key_columns ;
key_columns . reserve ( skip_keys_size_offset ) ;
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
auto block_columns = block . getColumns ( ) ;
2017-02-14 18:06:21 +00:00
2021-03-03 18:58:43 +00:00
/// Split into keys columns and attribute columns
2021-02-16 21:33:02 +00:00
for ( size_t i = 0 ; i < skip_keys_size_offset ; + + i )
{
key_columns . emplace_back ( * block_columns . begin ( ) ) ;
block_columns . erase ( block_columns . begin ( ) ) ;
}
2020-12-27 17:23:09 +00:00
2021-02-27 20:39:34 +00:00
DictionaryKeysExtractor < dictionary_key_type > keys_extractor ( key_columns , update_unit_ptr - > complex_key_arena ) ;
2021-03-04 14:34:39 +00:00
const auto & keys_extracted_from_block = keys_extractor . getKeys ( ) ;
2020-12-27 17:23:09 +00:00
2021-03-03 18:58:43 +00:00
for ( size_t index_of_attribute = 0 ; index_of_attribute < fetched_columns_during_update . size ( ) ; + + index_of_attribute )
2021-02-16 21:33:02 +00:00
{
2021-03-03 18:58:43 +00:00
auto & column_to_update = fetched_columns_during_update [ index_of_attribute ] ;
auto column = block . safeGetByPosition ( skip_keys_size_offset + index_of_attribute ) . column ;
2021-03-04 14:34:39 +00:00
column_to_update - > assumeMutable ( ) - > insertRangeFrom ( * column , 0 , keys_extracted_from_block . size ( ) ) ;
2021-02-16 21:33:02 +00:00
}
2017-04-01 07:20:54 +00:00
2021-03-04 14:34:39 +00:00
for ( size_t i = 0 ; i < keys_extracted_from_block . size ( ) ; + + i )
2021-02-16 21:33:02 +00:00
{
2021-03-04 14:34:39 +00:00
auto fetched_key_from_source = keys_extracted_from_block [ i ] ;
2021-03-03 18:58:43 +00:00
not_found_keys . erase ( fetched_key_from_source ) ;
2021-03-04 14:34:39 +00:00
update_unit_ptr - > requested_keys_to_fetched_columns_during_update_index [ fetched_key_from_source ] = found_keys_size ;
2021-03-03 18:58:43 +00:00
found_keys_in_source . emplace_back ( fetched_key_from_source ) ;
2021-03-04 14:34:39 +00:00
+ + found_keys_size ;
2021-02-16 21:33:02 +00:00
}
2021-03-03 18:58:43 +00:00
}
2021-02-18 16:57:20 +00:00
2021-03-03 18:58:43 +00:00
PaddedPODArray < KeyType > not_found_keys_in_source ;
not_found_keys_in_source . reserve ( not_found_keys . size ( ) ) ;
for ( auto & cell : not_found_keys )
not_found_keys_in_source . emplace_back ( cell . getKey ( ) ) ;
auto & update_unit_ptr_mutable_columns = update_unit_ptr - > fetched_columns_during_update ;
for ( const auto & fetched_column : fetched_columns_during_update )
update_unit_ptr_mutable_columns . emplace_back ( fetched_column - > assumeMutable ( ) ) ;
2021-02-16 21:33:02 +00:00
stream - > readSuffix ( ) ;
2020-09-17 18:57:57 +00:00
2021-03-04 12:11:07 +00:00
{
/// Lock for cache modification
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
cache_storage_ptr - > insertColumnsForKeys ( found_keys_in_source , fetched_columns_during_update ) ;
cache_storage_ptr - > insertDefaultKeys ( not_found_keys_in_source ) ;
error_count = 0 ;
last_exception = std : : exception_ptr { } ;
backoff_end_time = std : : chrono : : system_clock : : time_point { } ;
}
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequestTimeNs , watch . elapsed ( ) ) ;
}
catch ( . . . )
2017-04-01 07:20:54 +00:00
{
2021-02-16 21:33:02 +00:00
/// Lock just for last_exception safety
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
+ + error_count ;
last_exception = std : : current_exception ( ) ;
backoff_end_time = now + std : : chrono : : seconds ( calculateDurationWithBackoff ( rnd_engine , error_count ) ) ;
2020-02-06 12:18:19 +00:00
2021-02-16 21:33:02 +00:00
tryLogException ( last_exception , log ,
" Could not update cache dictionary ' " + getDictionaryID ( ) . getNameForLogs ( ) +
" ', next update is scheduled at " + ext : : to_string ( backoff_end_time . load ( ) ) ) ;
try
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
std : : rethrow_exception ( last_exception ) ;
2020-11-13 16:16:56 +00:00
}
2021-02-16 21:33:02 +00:00
catch ( . . . )
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Update failed for dictionary {} : {} " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
getCurrentExceptionMessage ( true /*with stack trace*/ ,
true /*check embedded stack trace*/ ) ) ;
2020-11-13 16:16:56 +00:00
}
2017-04-01 07:20:54 +00:00
}
2021-02-16 21:33:02 +00:00
2021-03-04 14:34:39 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedMiss , requested_keys_size - found_keys_size ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedFound , found_keys_size ) ;
2021-02-16 21:33:02 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequests ) ;
}
else
{
/// Won't request source for keys
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Query contains keys that are not present in cache or expired. Could not update cache dictionary {} now, because nearest update is scheduled at {}. Try again later. " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
ext : : to_string ( backoff_end_time . load ( ) ) ) ;
2017-04-01 07:20:54 +00:00
}
2021-02-16 21:33:02 +00:00
}
2017-04-01 07:20:54 +00:00
2021-02-16 21:33:02 +00:00
template class CacheDictionary < DictionaryKeyType : : simple > ;
template class CacheDictionary < DictionaryKeyType : : complex > ;
2017-04-01 07:20:54 +00:00
2016-06-07 21:07:44 +00:00
}