2018-11-28 11:37:12 +00:00
# include "CacheDictionary.h"
2017-04-27 17:16:24 +00:00
# include <memory>
# include <Columns/ColumnString.h>
2017-04-01 09:19:00 +00:00
# include <Common/BitHelpers.h>
2018-12-10 15:25:45 +00:00
# include <Common/CurrentMetrics.h>
2017-04-01 09:19:00 +00:00
# include <Common/HashTable/Hash.h>
2017-04-08 01:32:05 +00:00
# include <Common/ProfileEvents.h>
2018-12-10 15:25:45 +00:00
# include <Common/ProfilingScopedRWLock.h>
# include <Common/randomSeed.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2020-04-20 02:31:21 +00:00
# include <Core/Defines.h>
2020-11-17 14:36:04 +00:00
# include <IO/WriteBufferFromOStream.h>
2018-12-10 15:25:45 +00:00
# include <ext/range.h>
# include <ext/size.h>
2021-01-23 13:18:24 +00:00
# include <ext/map.h>
# include <ext/chrono_io.h>
2020-02-06 12:18:19 +00:00
# include <Common/setThreadName.h>
2021-01-21 14:42:50 +00:00
# include <DataTypes/DataTypesDecimal.h>
2018-12-10 15:25:45 +00:00
# include "DictionaryBlockInputStream.h"
# include "DictionaryFactory.h"
2020-12-27 17:23:09 +00:00
# include <Functions/FunctionHelpers.h>
2020-04-20 02:31:21 +00:00
2017-04-08 01:32:05 +00:00
namespace ProfileEvents
{
2018-12-10 15:25:45 +00:00
extern const Event DictCacheKeysRequested ;
extern const Event DictCacheKeysRequestedMiss ;
extern const Event DictCacheKeysRequestedFound ;
extern const Event DictCacheKeysExpired ;
extern const Event DictCacheKeysNotFound ;
extern const Event DictCacheKeysHit ;
extern const Event DictCacheRequestTimeNs ;
extern const Event DictCacheRequests ;
extern const Event DictCacheLockWriteNs ;
extern const Event DictCacheLockReadNs ;
2017-04-08 01:32:05 +00:00
}
namespace CurrentMetrics
{
2018-12-10 15:25:45 +00:00
extern const Metric DictCacheRequests ;
2017-04-08 01:32:05 +00:00
}
2016-06-07 21:07:44 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int CACHE_DICTIONARY_UPDATE_FAIL ;
2017-04-01 07:20:54 +00:00
extern const int TYPE_MISMATCH ;
extern const int BAD_ARGUMENTS ;
extern const int UNSUPPORTED_METHOD ;
2018-11-28 11:37:12 +00:00
extern const int TOO_SMALL_BUFFER_SIZE ;
2020-04-17 17:01:18 +00:00
extern const int TIMEOUT_EXCEEDED ;
2016-06-07 21:07:44 +00:00
}
2017-09-01 17:21:03 +00:00
inline size_t CacheDictionary : : getCellIdx ( const Key id ) const
2016-06-07 21:07:44 +00:00
{
2017-04-01 07:20:54 +00:00
const auto hash = intHash64 ( id ) ;
const auto idx = hash & size_overlap_mask ;
return idx ;
2016-06-07 21:07:44 +00:00
}
2018-12-10 15:25:45 +00:00
CacheDictionary : : CacheDictionary (
2020-07-14 18:46:29 +00:00
const StorageID & dict_id_ ,
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_ ,
DictionarySourcePtr source_ptr_ ,
2020-02-06 12:18:19 +00:00
DictionaryLifetime dict_lifetime_ ,
2020-11-18 13:38:14 +00:00
size_t strict_max_lifetime_seconds_ ,
2020-02-06 12:18:19 +00:00
size_t size_ ,
bool allow_read_expired_keys_ ,
size_t max_update_queue_size_ ,
size_t update_queue_push_timeout_milliseconds_ ,
2020-04-17 17:01:18 +00:00
size_t query_wait_timeout_milliseconds_ ,
2020-02-06 12:18:19 +00:00
size_t max_threads_for_updates_ )
2020-07-14 18:46:29 +00:00
: IDictionary ( dict_id_ )
2019-08-03 11:02:40 +00:00
, dict_struct ( dict_struct_ )
, source_ptr { std : : move ( source_ptr_ ) }
, dict_lifetime ( dict_lifetime_ )
2020-11-18 13:38:14 +00:00
, strict_max_lifetime_seconds ( strict_max_lifetime_seconds_ )
2020-02-06 12:18:19 +00:00
, allow_read_expired_keys ( allow_read_expired_keys_ )
, max_update_queue_size ( max_update_queue_size_ )
, update_queue_push_timeout_milliseconds ( update_queue_push_timeout_milliseconds_ )
2020-04-17 17:01:18 +00:00
, query_wait_timeout_milliseconds ( query_wait_timeout_milliseconds_ )
2020-02-06 12:18:19 +00:00
, max_threads_for_updates ( max_threads_for_updates_ )
2020-05-30 21:57:37 +00:00
, log ( & Poco : : Logger : : get ( " ExternalDictionaries " ) )
2019-08-03 11:02:40 +00:00
, size { roundUpToPowerOfTwoOrZero ( std : : max ( size_ , size_t ( max_collision_length ) ) ) }
2018-12-10 15:25:45 +00:00
, size_overlap_mask { this - > size - 1 }
, cells { this - > size }
, rnd_engine ( randomSeed ( ) )
2020-02-06 12:18:19 +00:00
, update_queue ( max_update_queue_size_ )
, update_pool ( max_threads_for_updates )
2016-06-07 21:07:44 +00:00
{
2020-07-20 13:44:07 +00:00
if ( ! source_ptr - > supportsSelectiveLoad ( ) )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : source cannot be used with CacheDictionary " , ErrorCodes : : UNSUPPORTED_METHOD } ;
2016-06-07 21:07:44 +00:00
2017-04-01 07:20:54 +00:00
createAttributes ( ) ;
2020-02-06 12:18:19 +00:00
for ( size_t i = 0 ; i < max_threads_for_updates ; + + i )
update_pool . scheduleOrThrowOnError ( [ this ] { updateThreadFunction ( ) ; } ) ;
}
CacheDictionary : : ~ CacheDictionary ( )
{
finished = true ;
update_queue . clear ( ) ;
for ( size_t i = 0 ; i < max_threads_for_updates ; + + i )
{
auto empty_finishing_ptr = std : : make_shared < UpdateUnit > ( std : : vector < Key > ( ) ) ;
update_queue . push ( empty_finishing_ptr ) ;
}
update_pool . wait ( ) ;
2016-06-07 21:07:44 +00:00
}
2020-08-13 10:45:06 +00:00
size_t CacheDictionary : : getBytesAllocated ( ) const
{
/// In case of existing string arena we check the size of it.
/// But the same appears in setAttributeValue() function, which is called from update() function
/// which in turn is called from another thread.
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return bytes_allocated + ( string_arena ? string_arena - > size ( ) : 0 ) ;
}
const IDictionarySource * CacheDictionary : : getSource ( ) const
{
/// Mutex required here because of the getSourceAndUpdateIfNeeded() function
/// which is used from another thread.
std : : lock_guard lock ( source_mutex ) ;
return source_ptr . get ( ) ;
}
2016-06-07 21:07:44 +00:00
2016-08-07 09:09:18 +00:00
void CacheDictionary : : toParent ( const PaddedPODArray < Key > & ids , PaddedPODArray < Key > & out ) const
2016-06-07 21:07:44 +00:00
{
2020-11-16 15:30:16 +00:00
const auto null_value = std : : get < UInt64 > ( hierarchical_attribute - > null_value ) ;
2021-01-23 13:18:24 +00:00
DictionaryDefaultValueExtractor < UInt64 > default_value_extractor ( null_value ) ;
getItemsNumberImpl < UInt64 , UInt64 > ( * hierarchical_attribute , ids , out , default_value_extractor ) ;
2016-06-07 21:07:44 +00:00
}
2017-03-25 23:42:04 +00:00
/// Allow to use single value in same way as array.
2018-12-10 15:25:45 +00:00
static inline CacheDictionary : : Key getAt ( const PaddedPODArray < CacheDictionary : : Key > & arr , const size_t idx )
{
return arr [ idx ] ;
}
static inline CacheDictionary : : Key getAt ( const CacheDictionary : : Key & value , const size_t )
{
return value ;
}
2017-03-25 23:42:04 +00:00
template < typename AncestorType >
2018-12-10 15:25:45 +00:00
void CacheDictionary : : isInImpl ( const PaddedPODArray < Key > & child_ids , const AncestorType & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
/// Transform all children to parents until ancestor id or null_value will be reached.
2018-08-10 04:02:56 +00:00
size_t out_size = out . size ( ) ;
2018-12-10 15:25:45 +00:00
memset ( out . data ( ) , 0xFF , out_size ) ; /// 0xFF means "not calculated"
2017-04-01 07:20:54 +00:00
2020-11-16 15:30:16 +00:00
const auto null_value = std : : get < UInt64 > ( hierarchical_attribute - > null_value ) ;
2017-04-01 07:20:54 +00:00
2019-02-26 14:52:55 +00:00
PaddedPODArray < Key > children ( out_size , 0 ) ;
2017-04-01 07:20:54 +00:00
PaddedPODArray < Key > parents ( child_ids . begin ( ) , child_ids . end ( ) ) ;
2020-04-20 02:31:21 +00:00
for ( size_t i = 0 ; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH ; + + i )
2017-04-01 07:20:54 +00:00
{
size_t out_idx = 0 ;
size_t parents_idx = 0 ;
size_t new_children_idx = 0 ;
2018-08-10 04:02:56 +00:00
while ( out_idx < out_size )
2017-04-01 07:20:54 +00:00
{
/// Already calculated
if ( out [ out_idx ] ! = 0xFF )
{
+ + out_idx ;
continue ;
}
/// No parent
if ( parents [ parents_idx ] = = null_value )
{
out [ out_idx ] = 0 ;
}
/// Found ancestor
else if ( parents [ parents_idx ] = = getAt ( ancestor_ids , parents_idx ) )
{
out [ out_idx ] = 1 ;
}
2017-08-10 03:22:43 +00:00
/// Loop detected
2017-08-04 16:59:50 +00:00
else if ( children [ new_children_idx ] = = parents [ parents_idx ] )
{
2017-08-07 19:02:30 +00:00
out [ out_idx ] = 1 ;
2017-08-04 16:59:50 +00:00
}
2017-08-10 03:22:43 +00:00
/// Found intermediate parent, add this value to search at next loop iteration
2017-04-01 07:20:54 +00:00
else
{
children [ new_children_idx ] = parents [ parents_idx ] ;
+ + new_children_idx ;
}
+ + out_idx ;
+ + parents_idx ;
}
if ( new_children_idx = = 0 )
break ;
/// Transform all children to its parents.
children . resize ( new_children_idx ) ;
parents . resize ( new_children_idx ) ;
toParent ( children , parents ) ;
}
2017-03-25 23:42:04 +00:00
}
void CacheDictionary : : isInVectorVector (
2018-12-10 15:25:45 +00:00
const PaddedPODArray < Key > & child_ids , const PaddedPODArray < Key > & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2016-12-13 21:15:27 +00:00
{
2017-04-01 07:20:54 +00:00
isInImpl ( child_ids , ancestor_ids , out ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2018-12-10 15:25:45 +00:00
void CacheDictionary : : isInVectorConstant ( const PaddedPODArray < Key > & child_ids , const Key ancestor_id , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
isInImpl ( child_ids , ancestor_id , out ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2018-12-10 15:25:45 +00:00
void CacheDictionary : : isInConstantVector ( const Key child_id , const PaddedPODArray < Key > & ancestor_ids , PaddedPODArray < UInt8 > & out ) const
2017-03-25 23:42:04 +00:00
{
2017-04-01 07:20:54 +00:00
/// Special case with single child value.
2017-03-25 23:42:04 +00:00
2020-11-16 15:30:16 +00:00
const auto null_value = std : : get < UInt64 > ( hierarchical_attribute - > null_value ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
PaddedPODArray < Key > child ( 1 , child_id ) ;
PaddedPODArray < Key > parent ( 1 ) ;
std : : vector < Key > ancestors ( 1 , child_id ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
/// Iteratively find all ancestors for child.
2020-04-20 02:31:21 +00:00
for ( size_t i = 0 ; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH ; + + i )
2017-04-01 07:20:54 +00:00
{
toParent ( child , parent ) ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
if ( parent [ 0 ] = = null_value )
break ;
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
child [ 0 ] = parent [ 0 ] ;
ancestors . push_back ( parent [ 0 ] ) ;
}
2017-03-25 23:42:04 +00:00
2017-04-01 07:20:54 +00:00
/// Assuming short hierarchy, so linear search is Ok.
2018-08-10 04:02:56 +00:00
for ( size_t i = 0 , out_size = out . size ( ) ; i < out_size ; + + i )
2017-04-01 07:20:54 +00:00
out [ i ] = std : : find ( ancestors . begin ( ) , ancestors . end ( ) , ancestor_ids [ i ] ) ! = ancestors . end ( ) ;
2017-03-25 23:42:04 +00:00
}
2016-12-13 21:15:27 +00:00
2020-12-27 17:23:09 +00:00
ColumnPtr CacheDictionary : : getColumn (
const std : : string & attribute_name ,
2021-01-21 14:42:50 +00:00
const DataTypePtr & result_type ,
2020-12-27 17:23:09 +00:00
const Columns & key_columns ,
const DataTypes & ,
2021-01-23 13:18:24 +00:00
const ColumnPtr default_values_column ) const
2016-06-07 21:07:44 +00:00
{
2020-12-27 17:23:09 +00:00
ColumnPtr result ;
PaddedPODArray < Key > backup_storage ;
2021-01-23 13:18:24 +00:00
const auto & keys = getColumnVectorData ( this , key_columns . front ( ) , backup_storage ) ;
auto keys_size = keys . size ( ) ;
2020-12-29 15:21:49 +00:00
2017-04-01 07:20:54 +00:00
auto & attribute = getAttribute ( attribute_name ) ;
2021-01-21 14:42:50 +00:00
const auto & dictionary_attribute = dict_struct . getAttribute ( attribute_name , result_type ) ;
2016-06-07 21:07:44 +00:00
2020-12-27 17:23:09 +00:00
auto type_call = [ & ] ( const auto & dictionary_attribute_type )
{
using Type = std : : decay_t < decltype ( dictionary_attribute_type ) > ;
using AttributeType = typename Type : : AttributeType ;
2021-01-23 13:18:24 +00:00
using ColumnProvider = DictionaryAttributeColumnProvider < AttributeType > ;
2021-01-23 16:47:33 +00:00
2021-01-25 12:58:26 +00:00
const auto & null_value = std : : get < AttributeType > ( attribute . null_value ) ;
2021-01-23 16:47:33 +00:00
DictionaryDefaultValueExtractor < AttributeType > default_value_extractor ( null_value , default_values_column ) ;
2016-06-07 21:07:44 +00:00
2021-01-23 13:18:24 +00:00
auto column = ColumnProvider : : getColumn ( dictionary_attribute , keys_size ) ;
2016-06-07 21:07:44 +00:00
2020-12-27 17:23:09 +00:00
if constexpr ( std : : is_same_v < AttributeType , String > )
{
2021-01-23 13:18:24 +00:00
getItemsString ( attribute , keys , column . get ( ) , default_value_extractor ) ;
}
else
{
auto & out = column - > getData ( ) ;
getItemsNumberImpl < AttributeType , AttributeType > ( attribute , keys , out , default_value_extractor ) ;
}
result = std : : move ( column ) ;
} ;
callOnDictionaryAttributeType ( attribute . type , type_call ) ;
return result ;
}
2021-01-23 16:47:33 +00:00
template < typename AttributeType , typename OutputType , typename DefaultValueExtractor >
2021-01-23 13:18:24 +00:00
void CacheDictionary : : getItemsNumberImpl (
2021-01-23 16:47:33 +00:00
Attribute & attribute ,
2021-01-23 13:18:24 +00:00
const PaddedPODArray < Key > & ids ,
ResultArrayType < OutputType > & out ,
2021-01-23 16:47:33 +00:00
DefaultValueExtractor & default_value_extractor ) const
2021-01-23 13:18:24 +00:00
{
/// First fill everything with default values
const auto rows = ext : : size ( ids ) ;
for ( const auto row : ext : : range ( 0 , rows ) )
out [ row ] = default_value_extractor [ row ] ;
/// Maybe there are duplicate keys, so we remember their indices.
std : : unordered_map < Key , std : : vector < size_t > > cache_expired_or_not_found_ids ;
auto & attribute_array = std : : get < ContainerPtrType < AttributeType > > ( attribute . arrays ) ;
2016-06-07 21:07:44 +00:00
2021-01-23 13:18:24 +00:00
size_t cache_hit = 0 ;
size_t cache_not_found_count = 0 ;
size_t cache_expired_cound = 0 ;
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
auto insert_to_answer_routine = [ & ] ( size_t row , size_t idx )
{
auto & cell = cells [ idx ] ;
if ( ! cell . isDefault ( ) )
out [ row ] = static_cast < OutputType > ( attribute_array [ idx ] ) ;
} ;
/// fetch up-to-date values, decide which ones require update
for ( const auto row : ext : : range ( 0 , rows ) )
{
const auto id = ids [ row ] ;
/** cell should be updated if either:
* 1. ids do not match ,
* 2. cell has expired ,
* 3. explicit defaults were specified and cell was set default . */
const auto [ cell_idx , state ] = findCellIdxForGet ( id , now ) ;
if ( state = = ResultState : : FoundAndValid )
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
+ + cache_hit ;
insert_to_answer_routine ( row , cell_idx ) ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
else if ( state = = ResultState : : NotFound | | state = = ResultState : : FoundButExpiredPermanently )
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
+ + cache_not_found_count ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
}
else if ( state = = ResultState : : FoundButExpired )
{
cache_expired_cound + + ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
if ( allow_read_expired_keys )
insert_to_answer_routine ( row , cell_idx ) ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
}
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , cache_expired_cound ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , cache_not_found_count ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , cache_hit ) ;
query_count . fetch_add ( rows , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( rows - cache_not_found_count - cache_expired_cound , std : : memory_order_release ) ;
if ( ! cache_not_found_count )
{
/// Nothing to update - return
if ( ! cache_expired_cound )
return ;
/// Update async only if allow_read_expired_keys_is_enabledadd condvar usage and better code
if ( allow_read_expired_keys )
{
std : : vector < Key > required_expired_ids ;
required_expired_ids . reserve ( cache_expired_cound ) ;
std : : transform ( std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
std : : back_inserter ( required_expired_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
/// request new values
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_expired_ids ) ) ;
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
/// Nothing to do - return
return ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
/// From this point we have to update all keys sync.
/// Maybe allow_read_expired_keys_from_cache_dictionary is disabled
/// and there no cache_not_found_ids but some cache_expired.
std : : vector < Key > required_ids ;
required_ids . reserve ( cache_not_found_count + cache_expired_cound ) ;
std : : transform ( std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
std : : back_inserter ( required_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
/// Request new values
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_ids ) ) ;
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
waitForCurrentUpdateFinish ( update_unit_ptr ) ;
/// Add updated keys to answer.
const size_t attribute_index = getAttributeIndex ( attribute . name ) ;
for ( auto & [ key , value ] : update_unit_ptr - > found_ids )
{
if ( value . found )
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
for ( const size_t row : cache_expired_or_not_found_ids [ key ] )
out [ row ] = std : : get < OutputType > ( value . values [ attribute_index ] ) ;
}
}
}
void CacheDictionary : : getItemsString (
Attribute & attribute ,
const PaddedPODArray < Key > & ids ,
ColumnString * out ,
2021-01-23 16:47:33 +00:00
DictionaryDefaultValueExtractor < String > & default_value_extractor ) const
2021-01-23 13:18:24 +00:00
{
const auto rows = ext : : size ( ids ) ;
/// Save on some allocations.
out - > getOffsets ( ) . reserve ( rows ) ;
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
auto & attribute_array = std : : get < ContainerPtrType < StringRef > > ( attribute . arrays ) ;
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
auto found_outdated_values = false ;
/// Perform optimistic version, fallback to pessimistic if failed.
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
/// Fetch up-to-date values, discard on fail.
for ( const auto row : ext : : range ( 0 , rows ) )
{
const auto id = ids [ row ] ;
const auto [ cell_idx , state ] = findCellIdxForGet ( id , now ) ;
if ( state = = ResultState : : FoundAndValid )
{
auto & cell = cells [ cell_idx ] ;
const auto string_ref = cell . isDefault ( ) ? default_value_extractor [ row ] : attribute_array [ cell_idx ] ;
out - > insertData ( string_ref . data , string_ref . size ) ;
}
else
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
found_outdated_values = true ;
break ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
}
2020-12-29 15:21:49 +00:00
2021-01-23 13:18:24 +00:00
/// Optimistic code completed successfully.
if ( ! found_outdated_values )
{
query_count . fetch_add ( rows , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( rows , std : : memory_order_release ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , ids . size ( ) ) ;
return ;
}
/// Now onto the pessimistic one, discard possible partial results from the optimistic path.
out - > getChars ( ) . resize_assume_reserved ( 0 ) ;
out - > getOffsets ( ) . resize_assume_reserved ( 0 ) ;
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std : : unordered_map < Key , std : : vector < size_t > > cache_expired_or_not_found_ids ;
/// we are going to store every string separately
std : : unordered_map < Key , String > local_cache ;
size_t cache_not_found_count = 0 ;
size_t cache_expired_count = 0 ;
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
size_t total_length = 0 ;
size_t cache_hit = 0 ;
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
auto insert_value_routine = [ & ] ( size_t row , size_t id , size_t cell_idx )
{
const auto & cell = cells [ cell_idx ] ;
const auto string_ref = cell . isDefault ( ) ? default_value_extractor [ row ] : attribute_array [ cell_idx ] ;
/// Do not store default, but count it in total length.
if ( ! cell . isDefault ( ) )
local_cache [ id ] = String { string_ref } ;
total_length + = string_ref . size + 1 ;
} ;
for ( const auto row : ext : : range ( 0 , ids . size ( ) ) )
{
const auto id = ids [ row ] ;
const auto [ cell_idx , state ] = findCellIdxForGet ( id , now ) ;
if ( state = = ResultState : : FoundAndValid )
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
+ + cache_hit ;
insert_value_routine ( row , id , cell_idx ) ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
else if ( state = = ResultState : : NotFound | | state = = ResultState : : FoundButExpiredPermanently )
2020-12-27 17:23:09 +00:00
{
2021-01-23 13:18:24 +00:00
+ + cache_not_found_count ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
else if ( state = = ResultState : : FoundButExpired )
{
+ + cache_expired_count ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
2016-06-07 21:07:44 +00:00
2021-01-23 13:18:24 +00:00
if ( allow_read_expired_keys )
insert_value_routine ( row , id , cell_idx ) ;
}
2020-12-27 17:23:09 +00:00
}
2021-01-23 13:18:24 +00:00
}
2020-12-27 17:23:09 +00:00
2021-01-23 13:18:24 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , cache_expired_count ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , cache_not_found_count ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , cache_hit ) ;
2020-12-29 15:21:49 +00:00
2021-01-23 13:18:24 +00:00
query_count . fetch_add ( rows , std : : memory_order_relaxed ) ;
hit_count . fetch_add ( rows - cache_expired_count - cache_not_found_count , std : : memory_order_release ) ;
/// Async update of expired keys.
if ( ! cache_not_found_count )
{
if ( allow_read_expired_keys & & cache_expired_count )
{
std : : vector < Key > required_expired_ids ;
required_expired_ids . reserve ( cache_expired_count ) ;
std : : transform ( std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
std : : back_inserter ( required_expired_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_expired_ids ) ) ;
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
/// Insert all found keys and defaults to output array.
out - > getChars ( ) . reserve ( total_length ) ;
for ( const auto row : ext : : range ( 0 , ext : : size ( ids ) ) )
{
const auto id = ids [ row ] ;
StringRef value ;
/// Previously we stored found keys in map.
const auto it = local_cache . find ( id ) ;
if ( it ! = local_cache . end ( ) )
value = StringRef ( it - > second ) ;
else
value = default_value_extractor [ row ] ;
out - > insertData ( value . data , value . size ) ;
}
/// Nothing to do else.
return ;
}
}
/// We will request both cache_not_found_ids and cache_expired_ids sync.
std : : vector < Key > required_ids ;
required_ids . reserve ( cache_not_found_count + cache_expired_count ) ;
std : : transform (
std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
std : : back_inserter ( required_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_ids ) ) ;
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
waitForCurrentUpdateFinish ( update_unit_ptr ) ;
const size_t attribute_index = getAttributeIndex ( attribute . name ) ;
/// Only calculate the total length.
for ( auto & [ key , value ] : update_unit_ptr - > found_ids )
{
if ( value . found )
{
const auto found_value_ref = std : : get < String > ( value . values [ attribute_index ] ) ;
total_length + = ( found_value_ref . size ( ) + 1 ) * cache_expired_or_not_found_ids [ key ] . size ( ) ;
}
else
{
for ( const auto row : cache_expired_or_not_found_ids [ key ] )
total_length + = default_value_extractor [ row ] . size + 1 ;
}
}
out - > getChars ( ) . reserve ( total_length ) ;
for ( const auto row : ext : : range ( 0 , ext : : size ( ids ) ) )
{
const auto id = ids [ row ] ;
StringRef value ;
/// We have two maps: found in cache and found in source.
const auto local_it = local_cache . find ( id ) ;
if ( local_it ! = local_cache . end ( ) )
value = StringRef ( local_it - > second ) ;
else
{
const auto found_it = update_unit_ptr - > found_ids . find ( id ) ;
/// Previously we didn't store defaults in local cache.
if ( found_it ! = update_unit_ptr - > found_ids . end ( ) & & found_it - > second . found )
value = std : : get < String > ( found_it - > second . values [ attribute_index ] ) ;
else
value = default_value_extractor [ row ] ;
}
out - > insertData ( value . data , value . size ) ;
}
2016-06-07 21:07:44 +00:00
}
2021-01-23 13:18:24 +00:00
2020-09-17 18:57:57 +00:00
template < class . . . Ts >
2020-09-28 14:48:32 +00:00
struct Overloaded : Ts . . . { using Ts : : operator ( ) . . . ; } ;
2020-09-17 18:57:57 +00:00
template < class . . . Ts >
2020-09-28 14:48:32 +00:00
Overloaded ( Ts . . . ) - > Overloaded < Ts . . . > ;
2020-09-17 18:57:57 +00:00
std : : string CacheDictionary : : AttributeValuesForKey : : dump ( )
{
2020-11-17 14:36:04 +00:00
WriteBufferFromOwnString os ;
2020-09-17 18:57:57 +00:00
for ( auto & attr : values )
2020-09-28 14:48:32 +00:00
std : : visit ( Overloaded {
2020-09-17 18:57:57 +00:00
[ & os ] ( UInt8 arg ) { os < < " type: UInt8, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( UInt16 arg ) { os < < " type: UInt16, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( UInt32 arg ) { os < < " type: UInt32, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( UInt64 arg ) { os < < " type: UInt64, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( UInt128 arg ) { os < < " type: UInt128, value: " < < arg . toHexString ( ) < < " \n " ; } ,
[ & os ] ( Int8 arg ) { os < < " type: Int8, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Int16 arg ) { os < < " type: Int16, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Int32 arg ) { os < < " type: Int32, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Int64 arg ) { os < < " type: Int64, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Decimal32 arg ) { os < < " type: Decimal32, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Decimal64 arg ) { os < < " type: Decimal64, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Decimal128 ) { os < < " type: Decimal128, value: ??? " < < " \n " ; } ,
[ & os ] ( Float32 arg ) { os < < " type: Float32, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( Float64 arg ) { os < < " type: Float64, value: " < < std : : to_string ( arg ) < < " \n " ; } ,
[ & os ] ( String arg ) { os < < " type: String, value: " < < arg + " \n " ; }
} , attr ) ;
return os . str ( ) ;
} ;
2020-09-25 16:56:22 +00:00
std : : string CacheDictionary : : UpdateUnit : : dumpFoundIds ( )
2020-09-17 18:57:57 +00:00
{
2020-11-17 14:36:04 +00:00
WriteBufferFromOwnString os ;
2020-09-17 18:57:57 +00:00
for ( auto it : found_ids )
{
2020-10-01 18:28:40 +00:00
os < < " Key: " < < std : : to_string ( it . first ) < < " \n " ;
2020-09-17 18:57:57 +00:00
if ( it . second . found )
2020-10-01 18:28:40 +00:00
os < < it . second . dump ( ) < < " \n " ;
2020-09-17 18:57:57 +00:00
}
2020-10-01 18:28:40 +00:00
return os . str ( ) ;
2020-09-17 18:57:57 +00:00
} ;
2016-06-07 21:07:44 +00:00
2020-11-13 16:16:56 +00:00
/// Returns cell_idx in handmade open addressing cache table and the state of the cell stored the key.
2020-10-02 19:09:48 +00:00
CacheDictionary : : FindResult CacheDictionary : : findCellIdxForGet ( const Key & id , const time_point_t now ) const
{
auto pos = getCellIdx ( id ) ;
const auto stop = pos + max_collision_length ;
for ( ; pos < stop ; + + pos )
{
const auto cell_idx = pos & size_overlap_mask ;
const auto & cell = cells [ cell_idx ] ;
if ( cell . id ! = id )
continue ;
2020-11-13 16:16:56 +00:00
if ( isExpiredPermanently ( now , cell . expiresAt ( ) ) )
return { cell_idx , ResultState : : FoundButExpiredPermanently } ;
2020-10-02 19:09:48 +00:00
2020-11-13 16:16:56 +00:00
if ( isExpired ( now , cell . expiresAt ( ) ) )
return { cell_idx , ResultState : : FoundButExpired } ;
return { cell_idx , ResultState : : FoundAndValid } ;
2020-10-02 19:09:48 +00:00
}
2020-11-13 16:16:56 +00:00
return { pos & size_overlap_mask , ResultState : : NotFound } ;
2020-10-02 19:09:48 +00:00
}
2020-11-17 14:36:04 +00:00
/// Returns cell_idx such that cells[cell_idx].id = id or the oldest cell in bounds of max_coolision_length.
2020-10-02 19:09:48 +00:00
size_t CacheDictionary : : findCellIdxForSet ( const Key & id ) const
2017-02-14 18:06:21 +00:00
{
2017-04-01 07:20:54 +00:00
auto pos = getCellIdx ( id ) ;
auto oldest_id = pos ;
2020-09-30 14:35:37 +00:00
auto oldest_time = time_point_t : : max ( ) ;
2017-04-01 07:20:54 +00:00
const auto stop = pos + max_collision_length ;
for ( ; pos < stop ; + + pos )
{
const auto cell_idx = pos & size_overlap_mask ;
const auto & cell = cells [ cell_idx ] ;
if ( cell . id ! = id )
{
/// maybe we already found nearest expired cell (try minimize collision_length on insert)
2020-10-02 19:09:48 +00:00
if ( cell . expiresAt ( ) < oldest_time )
2017-04-01 07:20:54 +00:00
{
2020-10-02 14:26:39 +00:00
oldest_time = cell . expiresAt ( ) ;
2017-04-01 07:20:54 +00:00
oldest_id = cell_idx ;
}
continue ;
}
2020-10-02 19:09:48 +00:00
/// We found the exact place for id.
return cell_idx ;
2017-04-01 07:20:54 +00:00
}
2020-10-02 19:09:48 +00:00
return oldest_id ;
2017-02-14 18:06:21 +00:00
}
2021-01-23 13:18:24 +00:00
ColumnUInt8 : : Ptr CacheDictionary : : hasKeys ( const Columns & key_columns , const DataTypes & ) const
2016-06-07 21:07:44 +00:00
{
2020-12-27 17:23:09 +00:00
PaddedPODArray < Key > backup_storage ;
2021-01-23 13:18:24 +00:00
const auto & ids = getColumnVectorData ( this , key_columns . front ( ) , backup_storage ) ;
2020-12-27 17:23:09 +00:00
auto result = ColumnUInt8 : : create ( ext : : size ( ids ) ) ;
auto & out = result - > getData ( ) ;
2020-02-06 12:18:19 +00:00
/// There are three types of ids.
/// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
/// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
/// - CacheNotFound ids. We have to go to external storage to know its value.
2020-09-30 14:35:37 +00:00
/// Mark everything as absent.
const auto rows = ext : : size ( ids ) ;
for ( const auto row : ext : : range ( 0 , rows ) )
out [ row ] = false ;
2017-04-01 07:20:54 +00:00
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
2020-09-17 18:57:57 +00:00
std : : unordered_map < Key , std : : vector < size_t > > cache_expired_or_not_found_ids ;
2017-04-01 07:20:54 +00:00
2020-02-06 12:18:19 +00:00
size_t cache_hit = 0 ;
2019-12-19 18:22:04 +00:00
2020-09-17 18:57:57 +00:00
size_t cache_expired_count = 0 ;
size_t cache_not_found_count = 0 ;
2017-04-01 07:20:54 +00:00
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
const auto now = std : : chrono : : system_clock : : now ( ) ;
/// fetch up-to-date values, decide which ones require update
for ( const auto row : ext : : range ( 0 , rows ) )
{
const auto id = ids [ row ] ;
2020-11-13 16:16:56 +00:00
const auto [ cell_idx , state ] = findCellIdxForGet ( id , now ) ;
auto & cell = cells [ cell_idx ] ;
2020-10-02 14:26:39 +00:00
2020-02-06 12:18:19 +00:00
auto insert_to_answer_routine = [ & ] ( )
{
2020-10-02 14:26:39 +00:00
out [ row ] = ! cell . isDefault ( ) ;
2020-02-06 12:18:19 +00:00
} ;
2020-11-13 16:16:56 +00:00
if ( state = = ResultState : : FoundAndValid )
2017-04-01 07:20:54 +00:00
{
+ + cache_hit ;
2020-02-06 12:18:19 +00:00
insert_to_answer_routine ( ) ;
2017-04-01 07:20:54 +00:00
}
2020-11-13 16:16:56 +00:00
else if ( state = = ResultState : : NotFound | | state = = ResultState : : FoundButExpiredPermanently )
{
/// Permanently expired equals to not found semantically.
+ + cache_not_found_count ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
}
else if ( state = = ResultState : : FoundButExpired )
{
cache_expired_count + + ;
cache_expired_or_not_found_ids [ id ] . push_back ( row ) ;
if ( allow_read_expired_keys )
insert_to_answer_routine ( ) ;
}
2017-04-01 07:20:54 +00:00
}
}
2020-09-17 18:57:57 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysExpired , cache_expired_count ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysNotFound , cache_not_found_count ) ;
2017-04-01 07:20:54 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysHit , cache_hit ) ;
query_count . fetch_add ( rows , std : : memory_order_relaxed ) ;
2020-09-17 18:57:57 +00:00
hit_count . fetch_add ( rows - cache_expired_count - cache_not_found_count , std : : memory_order_release ) ;
2019-12-26 18:56:34 +00:00
2020-09-17 18:57:57 +00:00
if ( ! cache_not_found_count )
2020-02-06 12:18:19 +00:00
{
/// Nothing to update - return;
2020-09-17 18:57:57 +00:00
if ( ! cache_expired_count )
2020-12-27 17:23:09 +00:00
return result ;
2019-12-19 18:22:04 +00:00
2020-02-06 12:18:19 +00:00
if ( allow_read_expired_keys )
2020-02-05 20:00:59 +00:00
{
2020-02-06 12:18:19 +00:00
std : : vector < Key > required_expired_ids ;
2020-09-17 18:57:57 +00:00
required_expired_ids . reserve ( cache_expired_count ) ;
2020-02-06 12:18:19 +00:00
std : : transform (
2020-09-17 18:57:57 +00:00
std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
2020-02-06 12:18:19 +00:00
std : : back_inserter ( required_expired_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
2020-09-17 18:57:57 +00:00
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_expired_ids ) ) ;
2020-02-06 12:18:19 +00:00
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
/// Update is async - no need to wait.
2020-12-27 17:23:09 +00:00
return result ;
2020-02-06 12:18:19 +00:00
}
}
/// At this point we have two situations.
2020-10-01 18:28:40 +00:00
/// There may be both types of keys: expired and not_found.
2020-02-06 12:18:19 +00:00
/// We will update them all synchronously.
std : : vector < Key > required_ids ;
2020-09-17 18:57:57 +00:00
required_ids . reserve ( cache_not_found_count + cache_expired_count ) ;
2020-02-06 12:18:19 +00:00
std : : transform (
2020-09-17 18:57:57 +00:00
std : : begin ( cache_expired_or_not_found_ids ) , std : : end ( cache_expired_or_not_found_ids ) ,
2020-02-06 12:18:19 +00:00
std : : back_inserter ( required_ids ) , [ ] ( auto & pair ) { return pair . first ; } ) ;
2020-09-17 18:57:57 +00:00
auto update_unit_ptr = std : : make_shared < UpdateUnit > ( std : : move ( required_ids ) ) ;
2020-02-06 12:18:19 +00:00
tryPushToUpdateQueueOrThrow ( update_unit_ptr ) ;
waitForCurrentUpdateFinish ( update_unit_ptr ) ;
2020-09-17 18:57:57 +00:00
for ( auto & [ key , value ] : update_unit_ptr - > found_ids )
{
if ( value . found )
for ( const auto row : cache_expired_or_not_found_ids [ key ] )
out [ row ] = true ;
}
2020-12-27 17:23:09 +00:00
return result ;
2016-06-07 21:07:44 +00:00
}
void CacheDictionary : : createAttributes ( )
{
2017-06-15 17:23:14 +00:00
const auto attributes_size = dict_struct . attributes . size ( ) ;
attributes . reserve ( attributes_size ) ;
2017-04-01 07:20:54 +00:00
bytes_allocated + = size * sizeof ( CellMetadata ) ;
2017-06-15 17:23:14 +00:00
bytes_allocated + = attributes_size * sizeof ( attributes . front ( ) ) ;
2017-04-01 07:20:54 +00:00
for ( const auto & attribute : dict_struct . attributes )
{
attribute_index_by_name . emplace ( attribute . name , attributes . size ( ) ) ;
2020-09-18 14:14:37 +00:00
attributes . push_back ( createAttributeWithTypeAndName ( attribute . underlying_type , attribute . name , attribute . null_value ) ) ;
2017-04-01 07:20:54 +00:00
if ( attribute . hierarchical )
{
2018-12-10 15:25:45 +00:00
hierarchical_attribute = & attributes . back ( ) ;
2017-04-01 07:20:54 +00:00
2019-08-03 11:02:40 +00:00
if ( hierarchical_attribute - > type ! = AttributeUnderlyingType : : utUInt64 )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : hierarchical attribute must be UInt64. " , ErrorCodes : : TYPE_MISMATCH } ;
2017-04-01 07:20:54 +00:00
}
}
2016-06-07 21:07:44 +00:00
}
2020-11-18 13:58:28 +00:00
/* For unknown reason clang-tidy wants this function to be static, but it uses bytes_allocated, which is a class member.
* NOLINT ( readability - convert - member - functions - to - static ) */
CacheDictionary : : Attribute CacheDictionary : : createAttributeWithTypeAndName ( const AttributeUnderlyingType type , const String & name , const Field & null_value )
2016-06-07 21:07:44 +00:00
{
2020-09-18 14:14:37 +00:00
Attribute attr { type , name , { } , { } } ;
2017-04-01 07:20:54 +00:00
switch ( type )
{
2020-11-16 21:37:38 +00:00
/* Macro argument should be enclosed in parentheses, but if do so we cannot initialize \
* NearestFieldType which takes TYPE as a template parameter . */
2020-11-16 15:39:24 +00:00
# define DISPATCH(TYPE)\
2020-11-17 14:36:04 +00:00
case AttributeUnderlyingType : : ut # # TYPE : \
{ \
attr . null_value = TYPE ( null_value . get < NearestFieldType < TYPE > > ( ) ) ; /* NOLINT(bugprone-macro-parentheses) */ \
attr . arrays = std : : make_unique < ContainerType < TYPE > > ( size ) ; /* NOLINT(bugprone-macro-parentheses) */ \
bytes_allocated + = size * sizeof ( TYPE ) ; \
break ; \
}
2018-11-15 09:29:53 +00:00
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
2020-11-16 21:37:38 +00:00
case AttributeUnderlyingType : : utString : {
2020-11-16 15:30:16 +00:00
attr . null_value = null_value . get < String > ( ) ;
2018-11-13 18:41:07 +00:00
attr . arrays = std : : make_unique < ContainerType < StringRef > > ( size ) ;
2017-04-01 07:20:54 +00:00
bytes_allocated + = size * sizeof ( StringRef ) ;
if ( ! string_arena )
string_arena = std : : make_unique < ArenaWithFreeLists > ( ) ;
break ;
2020-11-16 21:37:38 +00:00
}
2017-04-01 07:20:54 +00:00
}
return attr ;
2016-06-07 21:07:44 +00:00
}
2016-08-07 09:09:18 +00:00
void CacheDictionary : : setDefaultAttributeValue ( Attribute & attribute , const Key idx ) const
2016-06-07 21:07:44 +00:00
{
2017-04-01 07:20:54 +00:00
switch ( attribute . type )
{
2020-11-16 21:37:38 +00:00
/* Macro argument should be enclosed in parentheses, but if do so we cannot initialize \
* NearestFieldType which takes TYPE as a template parameter . */
2020-11-16 15:39:24 +00:00
# define DISPATCH(TYPE)\
case AttributeUnderlyingType : : ut # # TYPE : \
2020-11-16 21:37:38 +00:00
std : : get < ContainerPtrType < TYPE > > ( attribute . arrays ) [ idx ] = std : : get < TYPE > ( attribute . null_value ) ; /* NOLINT(bugprone-macro-parentheses) */ \
2018-10-08 19:45:17 +00:00
break ;
2020-09-18 14:14:37 +00:00
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal32 )
DISPATCH ( Decimal64 )
DISPATCH ( Decimal128 )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utString :
2017-04-01 07:20:54 +00:00
{
2020-11-16 15:30:16 +00:00
const auto & null_value_ref = std : : get < String > ( attribute . null_value ) ;
2017-04-01 07:20:54 +00:00
auto & string_ref = std : : get < ContainerPtrType < StringRef > > ( attribute . arrays ) [ idx ] ;
if ( string_ref . data ! = null_value_ref . data ( ) )
{
if ( string_ref . data )
string_arena - > free ( const_cast < char * > ( string_ref . data ) , string_ref . size ) ;
string_ref = StringRef { null_value_ref } ;
}
break ;
}
}
2016-06-07 21:07:44 +00:00
}
2016-08-07 09:09:18 +00:00
void CacheDictionary : : setAttributeValue ( Attribute & attribute , const Key idx , const Field & value ) const
2016-06-07 21:07:44 +00:00
{
2017-04-01 07:20:54 +00:00
switch ( attribute . type )
{
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utUInt8 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < UInt8 > > ( attribute . arrays ) [ idx ] = value . get < UInt64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utUInt16 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < UInt16 > > ( attribute . arrays ) [ idx ] = value . get < UInt64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utUInt32 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < UInt32 > > ( attribute . arrays ) [ idx ] = value . get < UInt64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utUInt64 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < UInt64 > > ( attribute . arrays ) [ idx ] = value . get < UInt64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utUInt128 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < UInt128 > > ( attribute . arrays ) [ idx ] = value . get < UInt128 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utInt8 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Int8 > > ( attribute . arrays ) [ idx ] = value . get < Int64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utInt16 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Int16 > > ( attribute . arrays ) [ idx ] = value . get < Int64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utInt32 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Int32 > > ( attribute . arrays ) [ idx ] = value . get < Int64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utInt64 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Int64 > > ( attribute . arrays ) [ idx ] = value . get < Int64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utFloat32 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Float32 > > ( attribute . arrays ) [ idx ] = value . get < Float64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utFloat64 :
2020-09-28 14:48:32 +00:00
std : : get < ContainerPtrType < Float64 > > ( attribute . arrays ) [ idx ] = value . get < Float64 > ( ) ;
2018-12-10 15:25:45 +00:00
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utDecimal32 :
2018-12-10 15:25:45 +00:00
std : : get < ContainerPtrType < Decimal32 > > ( attribute . arrays ) [ idx ] = value . get < Decimal32 > ( ) ;
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utDecimal64 :
2018-12-10 15:25:45 +00:00
std : : get < ContainerPtrType < Decimal64 > > ( attribute . arrays ) [ idx ] = value . get < Decimal64 > ( ) ;
break ;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utDecimal128 :
2018-12-10 15:25:45 +00:00
std : : get < ContainerPtrType < Decimal128 > > ( attribute . arrays ) [ idx ] = value . get < Decimal128 > ( ) ;
break ;
2018-10-08 19:45:17 +00:00
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType : : utString :
2017-04-01 07:20:54 +00:00
{
const auto & string = value . get < String > ( ) ;
auto & string_ref = std : : get < ContainerPtrType < StringRef > > ( attribute . arrays ) [ idx ] ;
2020-11-16 15:30:16 +00:00
const auto & null_value_ref = std : : get < String > ( attribute . null_value ) ;
2017-04-01 07:20:54 +00:00
/// free memory unless it points to a null_value
if ( string_ref . data & & string_ref . data ! = null_value_ref . data ( ) )
string_arena - > free ( const_cast < char * > ( string_ref . data ) , string_ref . size ) ;
2018-08-10 04:02:56 +00:00
const auto str_size = string . size ( ) ;
if ( str_size ! = 0 )
2017-04-01 07:20:54 +00:00
{
2020-04-22 07:03:43 +00:00
auto * string_ptr = string_arena - > alloc ( str_size + 1 ) ;
2018-08-10 04:02:56 +00:00
std : : copy ( string . data ( ) , string . data ( ) + str_size + 1 , string_ptr ) ;
string_ref = StringRef { string_ptr , str_size } ;
2017-04-01 07:20:54 +00:00
}
else
string_ref = { } ;
break ;
}
}
2016-06-07 21:07:44 +00:00
}
2016-08-07 09:09:18 +00:00
CacheDictionary : : Attribute & CacheDictionary : : getAttribute ( const std : : string & attribute_name ) const
2020-09-17 18:57:57 +00:00
{
const size_t attr_index = getAttributeIndex ( attribute_name ) ;
return attributes [ attr_index ] ;
}
size_t CacheDictionary : : getAttributeIndex ( const std : : string & attribute_name ) const
2016-06-07 21:07:44 +00:00
{
2017-04-01 07:20:54 +00:00
const auto it = attribute_index_by_name . find ( attribute_name ) ;
if ( it = = std : : end ( attribute_index_by_name ) )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : no such attribute ' " + attribute_name + " ' " , ErrorCodes : : BAD_ARGUMENTS } ;
2017-04-01 07:20:54 +00:00
2020-09-25 16:56:22 +00:00
return it - > second ;
2016-06-07 21:07:44 +00:00
}
2017-04-27 17:16:24 +00:00
bool CacheDictionary : : isEmptyCell ( const UInt64 idx ) const
{
2020-09-30 14:35:37 +00:00
return ( idx ! = zero_cell_idx & & cells [ idx ] . id = = 0 ) | | ( cells [ idx ] . deadline = = time_point_t ( ) ) ;
2017-04-27 17:16:24 +00:00
}
2020-10-02 14:26:39 +00:00
2017-04-27 17:16:24 +00:00
PaddedPODArray < CacheDictionary : : Key > CacheDictionary : : getCachedIds ( ) const
{
2017-05-04 18:14:23 +00:00
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
2017-04-27 17:16:24 +00:00
PaddedPODArray < Key > array ;
for ( size_t idx = 0 ; idx < cells . size ( ) ; + + idx )
{
2017-05-30 15:02:44 +00:00
auto & cell = cells [ idx ] ;
2020-11-18 13:58:28 +00:00
if ( ! isEmptyCell ( idx ) & & ! cells [ idx ] . isDefault ( ) )
2017-04-27 17:16:24 +00:00
array . push_back ( cell . id ) ;
}
return array ;
}
2019-02-18 18:51:46 +00:00
BlockInputStreamPtr CacheDictionary : : getBlockInputStream ( const Names & column_names , size_t max_block_size ) const
2017-04-27 17:16:24 +00:00
{
2020-12-19 13:24:51 +00:00
using BlockInputStreamType = DictionaryBlockInputStream < Key > ;
2017-05-29 17:26:45 +00:00
return std : : make_shared < BlockInputStreamType > ( shared_from_this ( ) , max_block_size , getCachedIds ( ) , column_names ) ;
2017-04-27 17:16:24 +00:00
}
2019-08-30 09:50:38 +00:00
std : : exception_ptr CacheDictionary : : getLastException ( ) const
{
const ProfilingScopedReadRWLock read_lock { rw_lock , ProfileEvents : : DictCacheLockReadNs } ;
return last_exception ;
}
2018-11-28 11:37:12 +00:00
void registerDictionaryCache ( DictionaryFactory & factory )
{
2019-12-25 23:12:12 +00:00
auto create_layout = [ = ] ( const std : : string & full_name ,
2018-12-10 15:25:45 +00:00
const DictionaryStructure & dict_struct ,
const Poco : : Util : : AbstractConfiguration & config ,
const std : : string & config_prefix ,
2018-12-10 15:50:58 +00:00
DictionarySourcePtr source_ptr ) - > DictionaryPtr
{
2018-11-28 11:37:12 +00:00
if ( dict_struct . key )
2020-02-06 12:18:19 +00:00
throw Exception { " 'key' is not supported for dictionary of layout 'cache' " ,
ErrorCodes : : UNSUPPORTED_METHOD } ;
2018-11-28 11:37:12 +00:00
if ( dict_struct . range_min | | dict_struct . range_max )
2019-12-25 23:12:12 +00:00
throw Exception { full_name
2018-12-10 15:25:45 +00:00
+ " : elements .structure.range_min and .structure.range_max should be defined only "
" for a dictionary of layout 'range_hashed' " ,
ErrorCodes : : BAD_ARGUMENTS } ;
2018-11-28 11:37:12 +00:00
const auto & layout_prefix = config_prefix + " .layout " ;
2020-02-06 12:18:19 +00:00
const size_t size = config . getUInt64 ( layout_prefix + " .cache.size_in_cells " ) ;
2018-11-28 11:37:12 +00:00
if ( size = = 0 )
2020-02-06 12:18:19 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 0 cells " ,
ErrorCodes : : TOO_SMALL_BUFFER_SIZE } ;
2018-11-28 11:37:12 +00:00
const bool require_nonempty = config . getBool ( config_prefix + " .require_nonempty " , false ) ;
if ( require_nonempty )
2019-12-25 23:12:12 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' cannot have 'require_nonempty' attribute set " ,
2018-12-10 15:25:45 +00:00
ErrorCodes : : BAD_ARGUMENTS } ;
2018-11-28 11:37:12 +00:00
2020-07-14 19:19:17 +00:00
const auto dict_id = StorageID : : fromDictionaryConfig ( config , config_prefix ) ;
2018-12-10 15:25:45 +00:00
const DictionaryLifetime dict_lifetime { config , config_prefix + " .lifetime " } ;
2020-02-06 12:18:19 +00:00
2020-11-18 13:38:14 +00:00
const size_t strict_max_lifetime_seconds =
config . getUInt64 ( layout_prefix + " .cache.strict_max_lifetime_seconds " , static_cast < size_t > ( dict_lifetime . max_sec ) ) ;
2020-04-17 17:01:18 +00:00
2020-02-06 12:18:19 +00:00
const size_t max_update_queue_size =
config . getUInt64 ( layout_prefix + " .cache.max_update_queue_size " , 100000 ) ;
if ( max_update_queue_size = = 0 )
2020-07-14 19:19:17 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' cannot have empty update queue of size 0 " ,
2020-02-06 12:18:19 +00:00
ErrorCodes : : TOO_SMALL_BUFFER_SIZE } ;
const bool allow_read_expired_keys =
config . getBool ( layout_prefix + " .cache.allow_read_expired_keys " , false ) ;
const size_t update_queue_push_timeout_milliseconds =
config . getUInt64 ( layout_prefix + " .cache.update_queue_push_timeout_milliseconds " , 10 ) ;
if ( update_queue_push_timeout_milliseconds < 10 )
2020-07-14 19:19:17 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' have too little update_queue_push_timeout " ,
2020-02-06 12:18:19 +00:00
ErrorCodes : : BAD_ARGUMENTS } ;
2020-04-17 17:01:18 +00:00
const size_t query_wait_timeout_milliseconds =
config . getUInt64 ( layout_prefix + " .cache.query_wait_timeout_milliseconds " , 60000 ) ;
2020-02-06 12:18:19 +00:00
const size_t max_threads_for_updates =
config . getUInt64 ( layout_prefix + " .max_threads_for_updates " , 4 ) ;
if ( max_threads_for_updates = = 0 )
2020-07-14 19:19:17 +00:00
throw Exception { full_name + " : dictionary of layout 'cache' cannot have zero threads for updates. " ,
2020-02-06 12:18:19 +00:00
ErrorCodes : : BAD_ARGUMENTS } ;
return std : : make_unique < CacheDictionary > (
2020-07-14 19:19:17 +00:00
dict_id ,
2020-04-17 17:01:18 +00:00
dict_struct ,
std : : move ( source_ptr ) ,
dict_lifetime ,
2020-11-18 14:28:38 +00:00
strict_max_lifetime_seconds ,
2020-04-17 17:01:18 +00:00
size ,
allow_read_expired_keys ,
max_update_queue_size ,
update_queue_push_timeout_milliseconds ,
query_wait_timeout_milliseconds ,
2020-02-06 12:18:19 +00:00
max_threads_for_updates ) ;
2018-11-28 11:37:12 +00:00
} ;
2019-10-21 16:05:45 +00:00
factory . registerLayout ( " cache " , create_layout , false ) ;
2018-11-28 11:37:12 +00:00
}
2020-02-06 12:18:19 +00:00
void CacheDictionary : : updateThreadFunction ( )
{
setThreadName ( " AsyncUpdater " ) ;
while ( ! finished )
{
2020-08-11 19:50:48 +00:00
UpdateUnitPtr popped ;
update_queue . pop ( popped ) ;
2020-02-06 12:18:19 +00:00
if ( finished )
break ;
try
{
/// Update a bunch of ids.
2020-08-11 19:50:48 +00:00
update ( popped ) ;
2020-02-06 12:18:19 +00:00
2020-08-11 19:50:48 +00:00
/// Notify thread about finished updating the bunch of ids
2020-02-06 12:18:19 +00:00
/// where their own ids were included.
std : : unique_lock < std : : mutex > lock ( update_mutex ) ;
2020-08-11 19:50:48 +00:00
popped - > is_done = true ;
2020-02-06 12:18:19 +00:00
is_update_finished . notify_all ( ) ;
}
catch ( . . . )
{
std : : unique_lock < std : : mutex > lock ( update_mutex ) ;
2020-08-11 19:50:48 +00:00
popped - > current_exception = std : : current_exception ( ) ;
2020-02-06 12:18:19 +00:00
is_update_finished . notify_all ( ) ;
}
}
}
void CacheDictionary : : waitForCurrentUpdateFinish ( UpdateUnitPtr & update_unit_ptr ) const
{
2020-04-17 17:01:18 +00:00
std : : unique_lock < std : : mutex > update_lock ( update_mutex ) ;
bool result = is_update_finished . wait_for (
update_lock ,
2020-08-26 11:06:32 +00:00
std : : chrono : : milliseconds ( query_wait_timeout_milliseconds ) ,
2020-05-12 05:43:42 +00:00
[ & ] { return update_unit_ptr - > is_done | | update_unit_ptr - > current_exception ; } ) ;
2020-02-06 12:18:19 +00:00
2020-04-17 17:01:18 +00:00
if ( ! result )
{
2020-07-14 20:32:13 +00:00
throw DB : : Exception ( ErrorCodes : : TIMEOUT_EXCEEDED ,
2020-08-26 11:06:32 +00:00
" Dictionary {} source seems unavailable, because {}ms timeout exceeded. " ,
2020-08-26 14:04:05 +00:00
getDictionaryID ( ) . getNameForLogs ( ) , toString ( query_wait_timeout_milliseconds ) ) ;
2020-04-17 17:01:18 +00:00
}
2020-02-06 12:18:19 +00:00
if ( update_unit_ptr - > current_exception )
2020-09-16 10:00:15 +00:00
{
// Don't just rethrow it, because sharing the same exception object
// between multiple threads can lead to weird effects if they decide to
// modify it, for example, by adding some error context.
try
{
std : : rethrow_exception ( update_unit_ptr - > current_exception ) ;
}
catch ( . . . )
{
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
2020-09-22 15:52:25 +00:00
" Update failed for dictionary '{}': {} " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
2020-09-16 10:00:15 +00:00
getCurrentExceptionMessage ( true /*with stack trace*/ ,
true /*check embedded stack trace*/ ) ) ;
}
}
2020-02-06 12:18:19 +00:00
}
void CacheDictionary : : tryPushToUpdateQueueOrThrow ( UpdateUnitPtr & update_unit_ptr ) const
{
if ( ! update_queue . tryPush ( update_unit_ptr , update_queue_push_timeout_milliseconds ) )
2020-07-14 20:32:13 +00:00
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Cannot push to internal update queue in dictionary {}. "
" Timelimit of {} ms. exceeded. Current queue size is {} " ,
getDictionaryID ( ) . getNameForLogs ( ) , std : : to_string ( update_queue_push_timeout_milliseconds ) ,
std : : to_string ( update_queue . size ( ) ) ) ;
2020-02-06 12:18:19 +00:00
}
2020-09-28 14:48:32 +00:00
2020-10-02 15:47:07 +00:00
std : : vector < CacheDictionary : : AttributeValue > CacheDictionary : : getAttributeValuesFromBlockAtPosition ( const std : : vector < const IColumn * > & column_ptrs , size_t position )
2020-09-28 14:48:32 +00:00
{
std : : vector < AttributeValue > answer ;
answer . reserve ( column_ptrs . size ( ) ) ;
2020-10-02 15:47:07 +00:00
for ( const auto * pure_column : column_ptrs )
2020-09-28 14:48:32 +00:00
{
2020-09-30 14:35:37 +00:00
# define DISPATCH(TYPE) \
2020-10-02 15:47:07 +00:00
if ( const auto * column = typeid_cast < const Column # # TYPE * > ( pure_column ) ) { \
2020-09-30 14:35:37 +00:00
answer . emplace_back ( column - > getElement ( position ) ) ; \
continue ; \
2020-10-02 15:47:07 +00:00
}
2020-09-30 14:35:37 +00:00
DISPATCH ( UInt8 )
DISPATCH ( UInt16 )
DISPATCH ( UInt32 )
DISPATCH ( UInt64 )
DISPATCH ( UInt128 )
DISPATCH ( Int8 )
DISPATCH ( Int16 )
DISPATCH ( Int32 )
DISPATCH ( Int64 )
DISPATCH ( Decimal < Decimal32 > )
DISPATCH ( Decimal < Decimal64 > )
DISPATCH ( Decimal < Decimal128 > )
DISPATCH ( Float32 )
DISPATCH ( Float64 )
# undef DISPATCH
2020-10-02 15:47:07 +00:00
if ( const auto * column = typeid_cast < const ColumnString * > ( pure_column ) )
2020-09-28 14:48:32 +00:00
{
answer . emplace_back ( column - > getDataAt ( position ) . toString ( ) ) ;
continue ;
}
}
return answer ;
}
2020-11-16 15:30:16 +00:00
void CacheDictionary : : update ( UpdateUnitPtr & update_unit_ptr )
2020-02-06 12:18:19 +00:00
{
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : DictCacheRequests } ;
2020-08-11 19:39:48 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequested , update_unit_ptr - > requested_ids . size ( ) ) ;
2020-02-06 12:18:19 +00:00
2020-09-17 18:57:57 +00:00
auto & map_ids = update_unit_ptr - > found_ids ;
size_t found_num = 0 ;
2020-02-06 12:18:19 +00:00
2020-09-17 18:57:57 +00:00
const auto now = std : : chrono : : system_clock : : now ( ) ;
2020-02-28 12:34:39 +00:00
2020-02-06 12:18:19 +00:00
if ( now > backoff_end_time . load ( ) )
{
try
{
2020-07-21 08:13:23 +00:00
auto current_source_ptr = getSourceAndUpdateIfNeeded ( ) ;
2020-02-06 12:18:19 +00:00
Stopwatch watch ;
2020-08-11 19:39:48 +00:00
BlockInputStreamPtr stream = current_source_ptr - > loadIds ( update_unit_ptr - > requested_ids ) ;
2020-02-06 12:18:19 +00:00
stream - > readPrefix ( ) ;
2020-02-28 12:34:39 +00:00
while ( true )
2020-02-06 12:18:19 +00:00
{
2020-07-20 12:34:29 +00:00
Block block = stream - > read ( ) ;
if ( ! block )
break ;
2020-02-28 12:34:39 +00:00
2020-04-22 07:03:43 +00:00
const auto * id_column = typeid_cast < const ColumnUInt64 * > ( block . safeGetByPosition ( 0 ) . column . get ( ) ) ;
2020-02-06 12:18:19 +00:00
if ( ! id_column )
2020-07-14 20:32:13 +00:00
throw Exception { ErrorCodes : : TYPE_MISMATCH ,
" {}: id column has type different from UInt64. " , getDictionaryID ( ) . getNameForLogs ( ) } ;
2020-02-06 12:18:19 +00:00
const auto & ids = id_column - > getData ( ) ;
/// cache column pointers
const auto column_ptrs = ext : : map < std : : vector > (
ext : : range ( 0 , attributes . size ( ) ) , [ & block ] ( size_t i ) { return block . safeGetByPosition ( i + 1 ) . column . get ( ) ; } ) ;
2020-09-17 18:57:57 +00:00
found_num + = ids . size ( ) ;
2020-02-06 12:18:19 +00:00
for ( const auto i : ext : : range ( 0 , ids . size ( ) ) )
{
2020-07-20 13:06:44 +00:00
/// Modifying cache with write lock
2020-07-20 12:34:29 +00:00
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2020-02-06 12:18:19 +00:00
const auto id = ids [ i ] ;
2020-10-02 19:09:48 +00:00
const auto cell_idx = findCellIdxForSet ( id ) ;
2020-02-06 12:18:19 +00:00
auto & cell = cells [ cell_idx ] ;
2020-09-17 18:57:57 +00:00
auto it = map_ids . find ( id ) ;
2020-09-24 15:53:14 +00:00
/// We have some extra keys from source. Won't add them to cache.
if ( it = = map_ids . end ( ) )
continue ;
2020-09-17 18:57:57 +00:00
auto & all_attributes = it - > second ;
all_attributes . found = true ;
2020-09-28 14:48:32 +00:00
all_attributes . values = getAttributeValuesFromBlockAtPosition ( column_ptrs , i ) ;
2020-09-17 18:57:57 +00:00
2020-02-06 12:18:19 +00:00
for ( const auto attribute_idx : ext : : range ( 0 , attributes . size ( ) ) )
{
const auto & attribute_column = * column_ptrs [ attribute_idx ] ;
auto & attribute = attributes [ attribute_idx ] ;
setAttributeValue ( attribute , cell_idx , attribute_column [ i ] ) ;
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if ( cell . id = = 0 & & cell_idx ! = zero_cell_idx )
element_count . fetch_add ( 1 , std : : memory_order_relaxed ) ;
cell . id = id ;
2020-11-16 15:30:16 +00:00
setLifetime ( cell , now ) ;
2020-02-06 12:18:19 +00:00
}
}
stream - > readSuffix ( ) ;
2020-10-02 14:26:39 +00:00
/// Lock for cache modification
2020-07-20 12:34:29 +00:00
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2020-10-02 14:26:39 +00:00
for ( auto & [ key , value ] : update_unit_ptr - > found_ids )
{
if ( ! value . found )
{
2020-10-02 19:09:48 +00:00
auto cell_idx = findCellIdxForSet ( key ) ;
auto & cell = cells [ cell_idx ] ;
2020-10-02 14:26:39 +00:00
cell . id = key ;
2020-11-16 15:30:16 +00:00
setLifetime ( cell , now ) ;
2020-10-02 14:26:39 +00:00
cell . setDefault ( ) ;
2020-10-02 15:47:07 +00:00
}
}
2020-10-02 14:26:39 +00:00
2020-02-06 12:18:19 +00:00
error_count = 0 ;
last_exception = std : : exception_ptr { } ;
backoff_end_time = std : : chrono : : system_clock : : time_point { } ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequestTimeNs , watch . elapsed ( ) ) ;
}
catch ( . . . )
{
2020-07-20 13:06:44 +00:00
/// Lock just for last_exception safety
2020-07-20 12:34:29 +00:00
ProfilingScopedWriteRWLock write_lock { rw_lock , ProfileEvents : : DictCacheLockWriteNs } ;
2020-02-06 12:18:19 +00:00
+ + error_count ;
last_exception = std : : current_exception ( ) ;
backoff_end_time = now + std : : chrono : : seconds ( calculateDurationWithBackoff ( rnd_engine , error_count ) ) ;
2020-07-14 20:32:13 +00:00
tryLogException ( last_exception , log ,
" Could not update cache dictionary ' " + getDictionaryID ( ) . getNameForLogs ( ) +
" ', next update is scheduled at " + ext : : to_string ( backoff_end_time . load ( ) ) ) ;
2020-09-29 18:28:06 +00:00
try
{
std : : rethrow_exception ( last_exception ) ;
}
catch ( . . . )
{
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
" Update failed for dictionary {} : {} " ,
getDictionaryID ( ) . getNameForLogs ( ) ,
getCurrentExceptionMessage ( true /*with stack trace*/ ,
true /*check embedded stack trace*/ ) ) ;
}
2020-02-06 12:18:19 +00:00
}
2020-10-02 15:47:07 +00:00
2020-02-06 12:18:19 +00:00
2020-09-17 18:57:57 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedMiss , update_unit_ptr - > requested_ids . size ( ) - found_num ) ;
2020-02-06 12:18:19 +00:00
ProfileEvents : : increment ( ProfileEvents : : DictCacheKeysRequestedFound , found_num ) ;
ProfileEvents : : increment ( ProfileEvents : : DictCacheRequests ) ;
2020-09-29 18:28:06 +00:00
}
2020-10-02 15:47:07 +00:00
else
2020-09-29 18:28:06 +00:00
{
/// Won't request source for keys
throw DB : : Exception ( ErrorCodes : : CACHE_DICTIONARY_UPDATE_FAIL ,
2020-09-30 14:35:37 +00:00
" Query contains keys that are not present in cache or expired. Could not update cache dictionary {} now, because nearest update is scheduled at {}. Try again later. " ,
2020-09-29 18:28:06 +00:00
getDictionaryID ( ) . getNameForLogs ( ) ,
ext : : to_string ( backoff_end_time . load ( ) ) ) ;
}
2020-02-06 12:18:19 +00:00
}
2019-12-26 18:56:34 +00:00
2016-06-07 21:07:44 +00:00
}