2013-02-03 18:39:09 +00:00
# include <iomanip>
2014-12-30 12:58:02 +00:00
# include <thread>
# include <future>
2019-10-05 19:25:31 +00:00
# include <Poco/Version.h>
2018-08-10 19:53:49 +00:00
# include <Poco/Util/Application.h>
2017-04-01 09:19:00 +00:00
# include <Common/Stopwatch.h>
# include <Common/setThreadName.h>
# include <DataTypes/DataTypeAggregateFunction.h>
2018-01-09 00:19:58 +00:00
# include <DataTypes/DataTypeNullable.h>
2019-01-21 10:39:24 +00:00
# include <DataTypes/DataTypeLowCardinality.h>
2017-04-01 09:19:00 +00:00
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnTuple.h>
2019-01-21 10:39:24 +00:00
# include <Columns/ColumnLowCardinality.h>
2019-01-23 14:48:50 +00:00
# include <DataStreams/IBlockInputStream.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/NativeBlockOutputStream.h>
# include <DataStreams/NullBlockInputStream.h>
2018-02-21 06:25:21 +00:00
# include <DataStreams/materializeBlock.h>
2017-04-01 09:19:00 +00:00
# include <IO/WriteBufferFromFile.h>
2018-12-28 18:15:26 +00:00
# include <Compression/CompressedWriteBuffer.h>
2017-04-01 09:19:00 +00:00
# include <Interpreters/Aggregator.h>
# include <Common/ClickHouseRevision.h>
2017-04-08 01:32:05 +00:00
# include <Common/MemoryTracker.h>
2018-05-29 18:14:31 +00:00
# include <Common/CurrentThread.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2019-08-21 02:28:04 +00:00
# include <Common/assert_cast.h>
2018-04-09 13:52:39 +00:00
# include <common/demangle.h>
2019-08-27 18:59:21 +00:00
# include <common/config_common.h>
2019-11-04 05:29:54 +00:00
# include <AggregateFunctions/AggregateFunctionArray.h>
# include <AggregateFunctions/AggregateFunctionState.h>
2019-01-21 10:39:24 +00:00
2011-09-19 01:42:16 +00:00
2016-10-24 02:02:37 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event ExternalAggregationWritePart ;
extern const Event ExternalAggregationCompressedBytes ;
extern const Event ExternalAggregationUncompressedBytes ;
2016-10-24 02:02:37 +00:00
}
2016-10-24 04:06:27 +00:00
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric QueryThread ;
2016-10-24 04:06:27 +00:00
}
2011-09-19 01:42:16 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2018-03-09 23:23:15 +00:00
extern const int TOO_MANY_ROWS ;
2017-04-01 07:20:54 +00:00
extern const int EMPTY_DATA_PASSED ;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS ;
2018-09-01 03:17:43 +00:00
extern const int LOGICAL_ERROR ;
2016-01-11 21:46:36 +00:00
}
2011-09-26 07:25:22 +00:00
2013-02-16 18:59:05 +00:00
AggregatedDataVariants : : ~ AggregatedDataVariants ( )
{
2017-04-01 07:20:54 +00:00
if ( aggregator & & ! aggregator - > all_aggregates_has_trivial_destructor )
{
try
{
aggregator - > destroyAllAggregateStates ( * this ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
2013-02-16 18:59:05 +00:00
}
2018-09-12 13:27:00 +00:00
void AggregatedDataVariants : : convertToTwoLevel ( )
2014-12-30 12:58:02 +00:00
{
2017-04-01 07:20:54 +00:00
if ( aggregator )
LOG_TRACE ( aggregator - > log , " Converting aggregation data to two-level. " ) ;
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
switch ( type )
{
# define M(NAME) \
case Type : : NAME : \
NAME # # _two_level = std : : make_unique < decltype ( NAME # # _two_level ) : : element_type > ( * NAME ) ; \
NAME . reset ( ) ; \
type = Type : : NAME # # _two_level ; \
break ;
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
# undef M
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
default :
throw Exception ( " Wrong data variant passed. " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2014-12-30 12:58:02 +00:00
}
2018-01-06 18:10:44 +00:00
Block Aggregator : : getHeader ( bool final ) const
2015-11-30 16:57:05 +00:00
{
2018-01-06 18:10:44 +00:00
Block res ;
2015-11-30 16:57:05 +00:00
2018-01-06 18:10:44 +00:00
if ( params . src_header )
{
for ( size_t i = 0 ; i < params . keys_size ; + + i )
res . insert ( params . src_header . safeGetByPosition ( params . keys [ i ] ) . cloneEmpty ( ) ) ;
2015-11-30 16:57:05 +00:00
2018-01-06 18:10:44 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
size_t arguments_size = params . aggregates [ i ] . arguments . size ( ) ;
DataTypes argument_types ( arguments_size ) ;
for ( size_t j = 0 ; j < arguments_size ; + + j )
argument_types [ j ] = params . src_header . safeGetByPosition ( params . aggregates [ i ] . arguments [ j ] ) . type ;
2017-04-01 07:20:54 +00:00
2018-01-09 00:19:58 +00:00
DataTypePtr type ;
2018-01-06 18:10:44 +00:00
if ( final )
2018-01-09 00:19:58 +00:00
type = params . aggregates [ i ] . function - > getReturnType ( ) ;
2018-01-06 18:10:44 +00:00
else
2018-01-09 00:19:58 +00:00
type = std : : make_shared < DataTypeAggregateFunction > ( params . aggregates [ i ] . function , argument_types , params . aggregates [ i ] . parameters ) ;
2017-04-01 07:20:54 +00:00
2018-02-26 03:37:08 +00:00
res . insert ( { type , params . aggregates [ i ] . column_name } ) ;
2018-01-06 18:10:44 +00:00
}
}
else if ( params . intermediate_header )
{
res = params . intermediate_header . cloneEmpty ( ) ;
2017-04-01 07:20:54 +00:00
2018-01-06 18:10:44 +00:00
if ( final )
2018-01-09 00:19:58 +00:00
{
2018-01-06 18:10:44 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2018-01-09 00:19:58 +00:00
{
auto & elem = res . getByPosition ( params . keys_size + i ) ;
2018-01-06 18:10:44 +00:00
2018-01-09 00:19:58 +00:00
elem . type = params . aggregates [ i ] . function - > getReturnType ( ) ;
elem . column = elem . type - > createColumn ( ) ;
}
}
}
2018-01-06 18:10:44 +00:00
2018-02-21 06:25:21 +00:00
return materializeBlock ( res ) ;
2018-01-06 18:10:44 +00:00
}
Aggregator : : Aggregator ( const Params & params_ )
: params ( params_ ) ,
isCancelled ( [ ] ( ) { return false ; } )
{
2018-02-01 17:55:08 +00:00
/// Use query-level memory tracker
2019-03-14 18:03:35 +00:00
if ( auto memory_tracker_child = CurrentThread : : getMemoryTracker ( ) )
if ( auto memory_tracker = memory_tracker_child - > getParent ( ) )
memory_usage_before_aggregation = memory_tracker - > get ( ) ;
2017-04-01 07:20:54 +00:00
aggregate_functions . resize ( params . aggregates_size ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] = params . aggregates [ i ] . function . get ( ) ;
/// Initialize sizes of aggregation states and its offsets.
offsets_of_aggregate_states . resize ( params . aggregates_size ) ;
total_size_of_aggregate_states = 0 ;
all_aggregates_has_trivial_destructor = true ;
2020-01-11 09:50:41 +00:00
// aggregate_states will be aligned as below:
2018-08-05 08:45:15 +00:00
// |<-- state_1 -->|<-- pad_1 -->|<-- state_2 -->|<-- pad_2 -->| .....
//
// pad_N will be used to match alignment requirement for each next state.
// The address of state_1 is aligned based on maximum alignment requirements in states
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
offsets_of_aggregate_states [ i ] = total_size_of_aggregate_states ;
2018-08-05 08:45:15 +00:00
2017-04-01 07:20:54 +00:00
total_size_of_aggregate_states + = params . aggregates [ i ] . function - > sizeOfData ( ) ;
2020-01-11 09:50:41 +00:00
// aggregate states are aligned based on maximum requirement
2018-09-01 03:17:43 +00:00
align_aggregate_states = std : : max ( align_aggregate_states , params . aggregates [ i ] . function - > alignOfData ( ) ) ;
2018-08-05 08:45:15 +00:00
2018-09-01 03:17:43 +00:00
// If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned.
2018-08-05 08:45:15 +00:00
if ( i + 1 < params . aggregates_size )
{
2018-09-01 03:17:43 +00:00
size_t alignment_of_next_state = params . aggregates [ i + 1 ] . function - > alignOfData ( ) ;
if ( ( alignment_of_next_state & ( alignment_of_next_state - 1 ) ) ! = 0 )
throw Exception ( " Logical error: alignOfData is not 2^N " , ErrorCodes : : LOGICAL_ERROR ) ;
/// Extend total_size to next alignment requirement
/// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state.
total_size_of_aggregate_states = ( total_size_of_aggregate_states + alignment_of_next_state - 1 ) / alignment_of_next_state * alignment_of_next_state ;
2018-08-05 08:45:15 +00:00
}
2017-04-01 07:20:54 +00:00
if ( ! params . aggregates [ i ] . function - > hasTrivialDestructor ( ) )
all_aggregates_has_trivial_destructor = false ;
}
2018-02-18 04:17:11 +00:00
2018-08-27 17:42:13 +00:00
method_chosen = chooseAggregationMethod ( ) ;
2019-01-21 10:39:24 +00:00
HashMethodContext : : Settings cache_settings ;
2018-09-12 13:27:00 +00:00
cache_settings . max_threads = params . max_threads ;
2018-09-14 09:14:37 +00:00
aggregation_state_cache = AggregatedDataVariants : : createCache ( method_chosen , cache_settings ) ;
2015-12-09 02:56:18 +00:00
}
2018-02-18 04:17:11 +00:00
AggregatedDataVariants : : Type Aggregator : : chooseAggregationMethod ( )
2012-05-30 01:38:02 +00:00
{
2018-01-09 00:19:58 +00:00
/// If no keys. All aggregating to single row.
if ( params . keys_size = = 0 )
return AggregatedDataVariants : : Type : : without_key ;
2017-04-01 07:20:54 +00:00
/// Check if at least one of the specified keys is nullable.
2018-01-09 00:19:58 +00:00
DataTypes types_removed_nullable ;
types_removed_nullable . reserve ( params . keys . size ( ) ) ;
2017-04-01 07:20:54 +00:00
bool has_nullable_key = false ;
2018-08-21 14:53:51 +00:00
bool has_low_cardinality = false ;
2017-04-01 07:20:54 +00:00
2018-01-09 00:19:58 +00:00
for ( const auto & pos : params . keys )
2017-04-01 07:20:54 +00:00
{
2018-08-21 14:53:51 +00:00
DataTypePtr type = ( params . src_header ? params . src_header : params . intermediate_header ) . safeGetByPosition ( pos ) . type ;
2018-09-27 15:55:22 +00:00
if ( type - > lowCardinality ( ) )
2018-08-21 14:53:51 +00:00
{
has_low_cardinality = true ;
type = removeLowCardinality ( type ) ;
}
2018-01-09 00:19:58 +00:00
if ( type - > isNullable ( ) )
2017-04-01 07:20:54 +00:00
{
has_nullable_key = true ;
2018-08-21 14:53:51 +00:00
type = removeNullable ( type ) ;
2017-04-01 07:20:54 +00:00
}
2018-08-21 14:53:51 +00:00
types_removed_nullable . push_back ( type ) ;
2017-04-01 07:20:54 +00:00
}
/** Returns ordinary (not two-level) methods, because we start from them.
* Later , during aggregation process , data may be converted ( partitioned ) to two - level structure , if cardinality is high .
*/
size_t keys_bytes = 0 ;
2018-01-09 00:19:58 +00:00
size_t num_fixed_contiguous_keys = 0 ;
2017-04-01 07:20:54 +00:00
key_sizes . resize ( params . keys_size ) ;
for ( size_t j = 0 ; j < params . keys_size ; + + j )
{
2018-01-09 00:19:58 +00:00
if ( types_removed_nullable [ j ] - > isValueUnambiguouslyRepresentedInContiguousMemoryRegion ( ) )
2017-04-01 07:20:54 +00:00
{
2018-01-09 00:19:58 +00:00
if ( types_removed_nullable [ j ] - > isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion ( ) )
2017-04-01 07:20:54 +00:00
{
2018-01-09 00:19:58 +00:00
+ + num_fixed_contiguous_keys ;
key_sizes [ j ] = types_removed_nullable [ j ] - > getSizeOfValueInMemory ( ) ;
keys_bytes + = key_sizes [ j ] ;
2017-04-01 07:20:54 +00:00
}
}
}
2018-01-09 00:19:58 +00:00
if ( has_nullable_key )
2017-04-01 07:20:54 +00:00
{
2018-09-14 13:02:03 +00:00
if ( params . keys_size = = num_fixed_contiguous_keys & & ! has_low_cardinality )
2017-04-01 07:20:54 +00:00
{
/// Pack if possible all the keys along with information about which key values are nulls
/// into a fixed 16- or 32-byte blob.
2018-01-09 00:19:58 +00:00
if ( std : : tuple_size < KeysNullMap < UInt128 > > : : value + keys_bytes < = 16 )
2017-04-01 07:20:54 +00:00
return AggregatedDataVariants : : Type : : nullable_keys128 ;
2018-01-09 00:19:58 +00:00
if ( std : : tuple_size < KeysNullMap < UInt256 > > : : value + keys_bytes < = 32 )
2017-04-01 07:20:54 +00:00
return AggregatedDataVariants : : Type : : nullable_keys256 ;
}
2018-12-03 13:00:01 +00:00
if ( has_low_cardinality & & params . keys_size = = 1 )
{
if ( types_removed_nullable [ 0 ] - > isValueRepresentedByNumber ( ) )
{
size_t size_of_field = types_removed_nullable [ 0 ] - > getSizeOfValueInMemory ( ) ;
if ( size_of_field = = 1 )
return AggregatedDataVariants : : Type : : low_cardinality_key8 ;
if ( size_of_field = = 2 )
return AggregatedDataVariants : : Type : : low_cardinality_key16 ;
if ( size_of_field = = 4 )
return AggregatedDataVariants : : Type : : low_cardinality_key32 ;
if ( size_of_field = = 8 )
return AggregatedDataVariants : : Type : : low_cardinality_key64 ;
}
else if ( isString ( types_removed_nullable [ 0 ] ) )
return AggregatedDataVariants : : Type : : low_cardinality_key_string ;
else if ( isFixedString ( types_removed_nullable [ 0 ] ) )
return AggregatedDataVariants : : Type : : low_cardinality_key_fixed_string ;
}
2017-04-01 07:20:54 +00:00
/// Fallback case.
return AggregatedDataVariants : : Type : : serialized ;
}
/// No key has been found to be nullable.
/// Single numeric key.
2018-01-09 00:19:58 +00:00
if ( params . keys_size = = 1 & & types_removed_nullable [ 0 ] - > isValueRepresentedByNumber ( ) )
2017-04-01 07:20:54 +00:00
{
2018-01-09 00:19:58 +00:00
size_t size_of_field = types_removed_nullable [ 0 ] - > getSizeOfValueInMemory ( ) ;
2018-08-21 14:53:51 +00:00
if ( has_low_cardinality )
{
if ( size_of_field = = 1 )
return AggregatedDataVariants : : Type : : low_cardinality_key8 ;
if ( size_of_field = = 2 )
return AggregatedDataVariants : : Type : : low_cardinality_key16 ;
if ( size_of_field = = 4 )
return AggregatedDataVariants : : Type : : low_cardinality_key32 ;
if ( size_of_field = = 8 )
return AggregatedDataVariants : : Type : : low_cardinality_key64 ;
}
2017-04-01 07:20:54 +00:00
if ( size_of_field = = 1 )
return AggregatedDataVariants : : Type : : key8 ;
if ( size_of_field = = 2 )
return AggregatedDataVariants : : Type : : key16 ;
if ( size_of_field = = 4 )
return AggregatedDataVariants : : Type : : key32 ;
if ( size_of_field = = 8 )
return AggregatedDataVariants : : Type : : key64 ;
2017-06-15 09:12:32 +00:00
if ( size_of_field = = 16 )
return AggregatedDataVariants : : Type : : keys128 ;
throw Exception ( " Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
}
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
2018-09-14 13:02:03 +00:00
if ( params . keys_size = = num_fixed_contiguous_keys )
2018-01-09 00:19:58 +00:00
{
2018-09-14 13:02:03 +00:00
if ( has_low_cardinality )
{
if ( keys_bytes < = 16 )
return AggregatedDataVariants : : Type : : low_cardinality_keys128 ;
if ( keys_bytes < = 32 )
return AggregatedDataVariants : : Type : : low_cardinality_keys256 ;
}
2018-01-09 00:19:58 +00:00
if ( keys_bytes < = 16 )
return AggregatedDataVariants : : Type : : keys128 ;
if ( keys_bytes < = 32 )
return AggregatedDataVariants : : Type : : keys256 ;
}
2017-04-01 07:20:54 +00:00
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
2018-09-07 14:37:26 +00:00
if ( params . keys_size = = 1 & & isString ( types_removed_nullable [ 0 ] ) )
2018-08-21 14:53:51 +00:00
{
if ( has_low_cardinality )
return AggregatedDataVariants : : Type : : low_cardinality_key_string ;
else
return AggregatedDataVariants : : Type : : key_string ;
}
2017-04-01 07:20:54 +00:00
2018-09-07 14:37:26 +00:00
if ( params . keys_size = = 1 & & isFixedString ( types_removed_nullable [ 0 ] ) )
2018-08-21 14:53:51 +00:00
{
if ( has_low_cardinality )
return AggregatedDataVariants : : Type : : low_cardinality_key_fixed_string ;
else
return AggregatedDataVariants : : Type : : key_fixed_string ;
}
2017-04-01 07:20:54 +00:00
return AggregatedDataVariants : : Type : : serialized ;
2012-05-30 01:38:02 +00:00
}
2014-05-19 19:41:56 +00:00
void Aggregator : : createAggregateStates ( AggregateDataPtr & aggregate_data ) const
{
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < params . aggregates_size ; + + j )
{
try
{
2017-04-02 17:37:49 +00:00
/** An exception may occur if there is a shortage of memory.
* In order that then everything is properly destroyed , we " roll back " some of the created states .
* The code is not very convenient .
2017-04-01 07:20:54 +00:00
*/
aggregate_functions [ j ] - > create ( aggregate_data + offsets_of_aggregate_states [ j ] ) ;
}
catch ( . . . )
{
for ( size_t rollback_j = 0 ; rollback_j < j ; + + rollback_j )
aggregate_functions [ rollback_j ] - > destroy ( aggregate_data + offsets_of_aggregate_states [ rollback_j ] ) ;
throw ;
}
}
2014-05-19 19:41:56 +00:00
}
2017-04-02 17:37:49 +00:00
/** It's interesting - if you remove `noinline`, then gcc for some reason will inline this function, and the performance decreases (~ 10%).
* ( Probably because after the inline of this function , more internal functions no longer be inlined . )
* Inline does not make sense , since the inner loop is entirely inside this function .
2014-10-29 01:18:50 +00:00
*/
2014-05-10 00:31:22 +00:00
template < typename Method >
2014-10-29 01:18:50 +00:00
void NO_INLINE Aggregator : : executeImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Arena * aggregates_pool ,
size_t rows ,
2017-12-13 01:27:53 +00:00
ColumnRawPtrs & key_columns ,
2017-04-01 07:20:54 +00:00
AggregateFunctionInstruction * aggregate_instructions ,
bool no_more_keys ,
AggregateDataPtr overflow_row ) const
2014-05-10 00:31:22 +00:00
{
2019-01-21 10:39:24 +00:00
typename Method : : State state ( key_columns , key_sizes , aggregation_state_cache ) ;
2014-05-10 00:31:22 +00:00
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
2019-08-10 22:36:55 +00:00
//executeImplCase<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch ( method , state , aggregates_pool , rows , aggregate_instructions ) ;
2017-04-01 07:20:54 +00:00
else
2019-08-10 22:36:55 +00:00
executeImplCase < true > ( method , state , aggregates_pool , rows , aggregate_instructions , overflow_row ) ;
2014-12-30 10:16:23 +00:00
}
template < bool no_more_keys , typename Method >
void NO_INLINE Aggregator : : executeImplCase (
2017-04-01 07:20:54 +00:00
Method & method ,
typename Method : : State & state ,
Arena * aggregates_pool ,
size_t rows ,
AggregateFunctionInstruction * aggregate_instructions ,
AggregateDataPtr overflow_row ) const
2014-12-30 10:16:23 +00:00
{
2017-04-02 17:37:49 +00:00
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// For all rows.
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < rows ; + + i )
{
2019-01-21 10:39:53 +00:00
AggregateDataPtr aggregate_data = nullptr ;
2017-04-01 07:20:54 +00:00
2019-01-21 10:39:24 +00:00
if constexpr ( ! no_more_keys ) /// Insert.
2019-01-21 10:39:53 +00:00
{
auto emplace_result = state . emplaceKey ( method . data , i , * aggregates_pool ) ;
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if ( emplace_result . isInserted ( ) )
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result . setMapped ( nullptr ) ;
aggregate_data = aggregates_pool - > alignedAlloc ( total_size_of_aggregate_states , align_aggregate_states ) ;
createAggregateStates ( aggregate_data ) ;
emplace_result . setMapped ( aggregate_data ) ;
}
else
aggregate_data = emplace_result . getMapped ( ) ;
}
2017-04-01 07:20:54 +00:00
else
{
2017-04-02 17:37:49 +00:00
/// Add only if the key already exists.
2019-01-21 10:39:53 +00:00
auto find_result = state . findKey ( method . data , i , * aggregates_pool ) ;
if ( find_result . isFound ( ) )
aggregate_data = find_result . getMapped ( ) ;
2017-04-01 07:20:54 +00:00
}
2018-08-23 13:22:03 +00:00
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
2017-04-02 17:37:49 +00:00
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
2018-08-23 13:22:03 +00:00
if ( ! aggregate_data & & ! overflow_row )
2017-04-01 07:20:54 +00:00
continue ;
2019-01-21 10:39:53 +00:00
AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Add values to the aggregate functions.
2017-04-01 07:20:54 +00:00
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
2019-11-11 08:36:19 +00:00
( * inst - > func ) ( inst - > that , value + inst - > state_offset , inst - > arguments , i , aggregates_pool ) ;
2017-04-01 07:20:54 +00:00
}
2014-05-10 00:31:22 +00:00
}
2014-12-30 11:27:58 +00:00
2019-08-10 22:36:55 +00:00
template < typename Method >
void NO_INLINE Aggregator : : executeImplBatch (
Method & method ,
typename Method : : State & state ,
Arena * aggregates_pool ,
size_t rows ,
AggregateFunctionInstruction * aggregate_instructions ) const
{
PODArray < AggregateDataPtr > places ( rows ) ;
/// For all rows.
for ( size_t i = 0 ; i < rows ; + + i )
{
AggregateDataPtr aggregate_data = nullptr ;
auto emplace_result = state . emplaceKey ( method . data , i , * aggregates_pool ) ;
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if ( emplace_result . isInserted ( ) )
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result . setMapped ( nullptr ) ;
aggregate_data = aggregates_pool - > alignedAlloc ( total_size_of_aggregate_states , align_aggregate_states ) ;
createAggregateStates ( aggregate_data ) ;
emplace_result . setMapped ( aggregate_data ) ;
}
else
aggregate_data = emplace_result . getMapped ( ) ;
places [ i ] = aggregate_data ;
2019-08-20 09:58:44 +00:00
assert ( places [ i ] ! = nullptr ) ;
2019-08-10 22:36:55 +00:00
}
/// Add values to the aggregate functions.
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
2019-11-04 05:29:54 +00:00
{
2019-11-11 08:36:19 +00:00
if ( inst - > offsets )
inst - > batch_that - > addBatchArray ( rows , places . data ( ) , inst - > state_offset , inst - > batch_arguments , inst - > offsets , aggregates_pool ) ;
else
inst - > batch_that - > addBatch ( rows , places . data ( ) , inst - > state_offset , inst - > batch_arguments , aggregates_pool ) ;
2019-11-04 05:29:54 +00:00
}
2019-08-10 22:36:55 +00:00
}
2015-01-13 03:03:45 +00:00
void NO_INLINE Aggregator : : executeWithoutKeyImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataWithoutKey & res ,
size_t rows ,
AggregateFunctionInstruction * aggregate_instructions ,
Arena * arena ) const
2015-01-13 03:03:45 +00:00
{
2019-08-10 23:40:15 +00:00
/// Adding values
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
2019-11-04 05:29:54 +00:00
{
if ( inst - > offsets )
2019-11-11 08:36:19 +00:00
inst - > batch_that - > addBatchSinglePlace (
inst - > offsets [ static_cast < ssize_t > ( rows - 1 ) ] , res + inst - > state_offset , inst - > batch_arguments , arena ) ;
2019-11-04 05:29:54 +00:00
else
2019-11-11 08:36:19 +00:00
inst - > batch_that - > addBatchSinglePlace ( rows , res + inst - > state_offset , inst - > batch_arguments , arena ) ;
2019-11-04 05:29:54 +00:00
}
2015-01-13 03:03:45 +00:00
}
2018-09-12 13:27:00 +00:00
bool Aggregator : : executeOnBlock ( const Block & block , AggregatedDataVariants & result ,
2019-08-11 21:45:18 +00:00
ColumnRawPtrs & key_columns , AggregateColumns & aggregate_columns , bool & no_more_keys )
2019-08-31 08:58:16 +00:00
{
UInt64 num_rows = block . rows ( ) ;
return executeOnBlock ( block . getColumns ( ) , num_rows , result , key_columns , aggregate_columns , no_more_keys ) ;
}
bool Aggregator : : executeOnBlock ( Columns columns , UInt64 num_rows , AggregatedDataVariants & result ,
ColumnRawPtrs & key_columns , AggregateColumns & aggregate_columns , bool & no_more_keys )
2014-05-10 05:16:23 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return true ;
2017-04-02 17:37:49 +00:00
/// `result` will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
2018-08-21 14:53:51 +00:00
/// How to perform the aggregation?
if ( result . empty ( ) )
{
2018-09-14 09:14:37 +00:00
result . init ( method_chosen ) ;
2018-08-21 14:53:51 +00:00
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
LOG_TRACE ( log , " Aggregation method: " < < result . getMethodName ( ) ) ;
}
if ( isCancelled ( ) )
return true ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] . resize ( params . aggregates [ i ] . arguments . size ( ) ) ;
2017-04-02 17:37:49 +00:00
/** Constant columns are not supported directly during aggregation.
* To make them work anyway , we materialize them .
2017-04-01 07:20:54 +00:00
*/
Columns materialized_columns ;
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
{
2019-08-31 08:58:16 +00:00
materialized_columns . push_back ( columns . at ( params . keys [ i ] ) - > convertToFullColumnIfConst ( ) ) ;
2018-12-21 16:00:07 +00:00
key_columns [ i ] = materialized_columns . back ( ) . get ( ) ;
2018-08-13 16:23:40 +00:00
2019-01-14 16:27:28 +00:00
if ( ! result . isLowCardinality ( ) )
2018-08-13 16:23:40 +00:00
{
2019-01-14 18:17:09 +00:00
auto column_no_lc = recursiveRemoveLowCardinality ( key_columns [ i ] - > getPtr ( ) ) ;
if ( column_no_lc . get ( ) ! = key_columns [ i ] )
2018-08-13 16:23:40 +00:00
{
2019-01-14 18:17:09 +00:00
materialized_columns . emplace_back ( std : : move ( column_no_lc ) ) ;
2018-08-21 14:53:51 +00:00
key_columns [ i ] = materialized_columns . back ( ) . get ( ) ;
2018-08-13 16:23:40 +00:00
}
}
2017-04-01 07:20:54 +00:00
}
AggregateFunctionInstructions aggregate_functions_instructions ( params . aggregates_size + 1 ) ;
aggregate_functions_instructions [ params . aggregates_size ] . that = nullptr ;
2019-11-04 05:29:54 +00:00
std : : vector < std : : vector < const IColumn * > > nested_columns_holder ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
for ( size_t j = 0 ; j < aggregate_columns [ i ] . size ( ) ; + + j )
{
2019-08-31 08:58:16 +00:00
materialized_columns . push_back ( columns . at ( params . aggregates [ i ] . arguments [ j ] ) - > convertToFullColumnIfConst ( ) ) ;
2018-12-21 16:00:07 +00:00
aggregate_columns [ i ] [ j ] = materialized_columns . back ( ) . get ( ) ;
2018-08-21 14:53:51 +00:00
2019-01-14 18:17:09 +00:00
auto column_no_lc = recursiveRemoveLowCardinality ( aggregate_columns [ i ] [ j ] - > getPtr ( ) ) ;
if ( column_no_lc . get ( ) ! = aggregate_columns [ i ] [ j ] )
2018-08-21 14:53:51 +00:00
{
2019-01-14 18:17:09 +00:00
materialized_columns . emplace_back ( std : : move ( column_no_lc ) ) ;
2018-08-21 14:53:51 +00:00
aggregate_columns [ i ] [ j ] = materialized_columns . back ( ) . get ( ) ;
}
2017-04-01 07:20:54 +00:00
}
aggregate_functions_instructions [ i ] . arguments = aggregate_columns [ i ] . data ( ) ;
2019-11-04 05:29:54 +00:00
aggregate_functions_instructions [ i ] . state_offset = offsets_of_aggregate_states [ i ] ;
auto that = aggregate_functions [ i ] ;
/// Unnest consecutive trailing -State combinators
while ( auto func = typeid_cast < const AggregateFunctionState * > ( that ) )
that = func - > getNestedFunction ( ) . get ( ) ;
aggregate_functions_instructions [ i ] . that = that ;
2019-11-11 08:36:19 +00:00
aggregate_functions_instructions [ i ] . func = that - > getAddressOfAddFunction ( ) ;
2019-11-04 05:29:54 +00:00
if ( auto func = typeid_cast < const AggregateFunctionArray * > ( that ) )
{
/// Unnest consecutive -State combinators before -Array
that = func - > getNestedFunction ( ) . get ( ) ;
while ( auto nested_func = typeid_cast < const AggregateFunctionState * > ( that ) )
that = nested_func - > getNestedFunction ( ) . get ( ) ;
auto [ nested_columns , offsets ] = checkAndGetNestedArrayOffset ( aggregate_columns [ i ] . data ( ) , that - > getArgumentTypes ( ) . size ( ) ) ;
nested_columns_holder . push_back ( std : : move ( nested_columns ) ) ;
aggregate_functions_instructions [ i ] . batch_arguments = nested_columns_holder . back ( ) . data ( ) ;
aggregate_functions_instructions [ i ] . offsets = offsets ;
}
else
aggregate_functions_instructions [ i ] . batch_arguments = aggregate_columns [ i ] . data ( ) ;
aggregate_functions_instructions [ i ] . batch_that = that ;
2017-04-01 07:20:54 +00:00
}
if ( isCancelled ( ) )
return true ;
if ( ( params . overflow_row | | result . type = = AggregatedDataVariants : : Type : : without_key ) & & ! result . without_key )
{
2018-09-01 03:17:43 +00:00
AggregateDataPtr place = result . aggregates_pool - > alignedAlloc ( total_size_of_aggregate_states , align_aggregate_states ) ;
2017-04-01 07:20:54 +00:00
createAggregateStates ( place ) ;
result . without_key = place ;
}
2017-04-02 17:37:49 +00:00
/// We select one of the aggregation methods and call it.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// For the case when there are no keys (all aggregate into one row).
2017-04-01 07:20:54 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key )
{
2019-08-31 08:58:16 +00:00
executeWithoutKeyImpl ( result . without_key , num_rows , aggregate_functions_instructions . data ( ) , result . aggregates_pool ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2017-04-02 17:37:49 +00:00
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
2017-04-01 07:20:54 +00:00
AggregateDataPtr overflow_row_ptr = params . overflow_row ? result . without_key : nullptr ;
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
2019-08-31 08:58:16 +00:00
executeImpl ( * result . NAME , result . aggregates_pool , num_rows , key_columns , aggregate_functions_instructions . data ( ) , \
2019-08-10 22:36:55 +00:00
no_more_keys , overflow_row_ptr ) ;
2018-08-21 14:53:51 +00:00
2019-08-23 21:10:26 +00:00
if ( false ) { }
APPLY_FOR_AGGREGATED_VARIANTS ( M )
2017-04-01 07:20:54 +00:00
# undef M
}
size_t result_size = result . sizeWithoutOverflowRow ( ) ;
Int64 current_memory_usage = 0 ;
2019-03-14 18:03:35 +00:00
if ( auto memory_tracker_child = CurrentThread : : getMemoryTracker ( ) )
if ( auto memory_tracker = memory_tracker_child - > getParent ( ) )
current_memory_usage = memory_tracker - > get ( ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation ; /// Here all the results in the sum are taken into account, from different threads.
2017-04-01 07:20:54 +00:00
bool worth_convert_to_two_level
= ( params . group_by_two_level_threshold & & result_size > = params . group_by_two_level_threshold )
| | ( params . group_by_two_level_threshold_bytes & & result_size_bytes > = static_cast < Int64 > ( params . group_by_two_level_threshold_bytes ) ) ;
2017-04-02 17:37:49 +00:00
/** Converting to a two-level data structure.
* It allows you to make , in the subsequent , an effective merge - either economical from memory or parallel .
2017-04-01 07:20:54 +00:00
*/
if ( result . isConvertibleToTwoLevel ( ) & & worth_convert_to_two_level )
2018-09-12 13:27:00 +00:00
result . convertToTwoLevel ( ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Checking the constraints.
2017-04-01 07:20:54 +00:00
if ( ! checkLimits ( result_size , no_more_keys ) )
return false ;
2017-04-02 17:37:49 +00:00
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two - level aggregation structure is used .
2017-04-01 07:20:54 +00:00
*/
if ( params . max_bytes_before_external_group_by
& & result . isTwoLevel ( )
& & current_memory_usage > static_cast < Int64 > ( params . max_bytes_before_external_group_by )
& & worth_convert_to_two_level )
{
2019-10-02 12:06:34 +00:00
if ( ! enoughSpaceInDirectory ( params . tmp_path , current_memory_usage + params . min_free_disk_space ) )
2019-08-27 18:59:21 +00:00
throw Exception ( " Not enough space for external aggregation in " + params . tmp_path , ErrorCodes : : NOT_ENOUGH_SPACE ) ;
2017-07-25 13:09:52 +00:00
writeToTemporaryFile ( result ) ;
2017-04-01 07:20:54 +00:00
}
return true ;
2015-10-22 01:44:33 +00:00
}
2017-07-25 13:09:52 +00:00
void Aggregator : : writeToTemporaryFile ( AggregatedDataVariants & data_variants )
2015-11-30 19:57:46 +00:00
{
2017-04-01 07:20:54 +00:00
Stopwatch watch ;
2017-07-25 13:09:52 +00:00
size_t rows = data_variants . size ( ) ;
2015-12-01 14:09:05 +00:00
2019-10-01 18:51:33 +00:00
auto file = createTemporaryFile ( params . tmp_path ) ;
2017-04-01 07:20:54 +00:00
const std : : string & path = file - > path ( ) ;
WriteBufferFromFile file_buf ( path ) ;
CompressedWriteBuffer compressed_buf ( file_buf ) ;
2018-02-19 00:45:32 +00:00
NativeBlockOutputStream block_out ( compressed_buf , ClickHouseRevision : : get ( ) , getHeader ( false ) ) ;
2015-11-30 19:57:46 +00:00
2017-04-01 07:20:54 +00:00
LOG_DEBUG ( log , " Writing part of aggregation data into temporary file " < < path < < " . " ) ;
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationWritePart ) ;
2015-12-01 14:09:05 +00:00
2017-07-25 13:09:52 +00:00
/// Flush only two-level data and possibly overflow data.
2015-11-30 19:57:46 +00:00
2015-12-01 14:09:05 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
2017-12-01 21:13:25 +00:00
writeToTemporaryFileImpl ( data_variants , * data_variants . NAME , block_out ) ;
2015-11-30 19:57:46 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-11-30 19:57:46 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2017-04-02 17:37:49 +00:00
/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
2017-04-01 07:20:54 +00:00
data_variants . init ( data_variants . type ) ;
data_variants . aggregates_pools = Arenas ( 1 , std : : make_shared < Arena > ( ) ) ;
data_variants . aggregates_pool = data_variants . aggregates_pools . back ( ) . get ( ) ;
2017-07-25 13:09:52 +00:00
data_variants . without_key = nullptr ;
2017-04-01 07:20:54 +00:00
block_out . flush ( ) ;
compressed_buf . next ( ) ;
file_buf . next ( ) ;
double elapsed_seconds = watch . elapsedSeconds ( ) ;
double compressed_bytes = file_buf . count ( ) ;
double uncompressed_bytes = compressed_buf . count ( ) ;
{
2019-01-02 06:44:36 +00:00
std : : lock_guard lock ( temporary_files . mutex ) ;
2017-04-01 07:20:54 +00:00
temporary_files . files . emplace_back ( std : : move ( file ) ) ;
temporary_files . sum_size_uncompressed + = uncompressed_bytes ;
temporary_files . sum_size_compressed + = compressed_bytes ;
}
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationCompressedBytes , compressed_bytes ) ;
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationUncompressedBytes , uncompressed_bytes ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Written part in " < < elapsed_seconds < < " sec., "
< < rows < < " rows, "
< < ( uncompressed_bytes / 1048576.0 ) < < " MiB uncompressed, "
< < ( compressed_bytes / 1048576.0 ) < < " MiB compressed, "
< < ( uncompressed_bytes / rows ) < < " uncompressed bytes per row, "
< < ( compressed_bytes / rows ) < < " compressed bytes per row, "
< < " compression rate: " < < ( uncompressed_bytes / compressed_bytes )
< < " ( " < < ( rows / elapsed_seconds ) < < " rows/sec., "
< < ( uncompressed_bytes / elapsed_seconds / 1048576.0 ) < < " MiB/sec. uncompressed, "
< < ( compressed_bytes / elapsed_seconds / 1048576.0 ) < < " MiB/sec. compressed) " ) ;
2015-11-30 19:57:46 +00:00
}
2015-12-06 14:27:09 +00:00
template < typename Method >
Block Aggregator : : convertOneBucketToBlock (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
bool final ,
size_t bucket ) const
2015-12-06 14:27:09 +00:00
{
2017-04-01 07:20:54 +00:00
Block block = prepareBlockAndFill ( data_variants , final , method . data . impls [ bucket ] . size ( ) ,
[ bucket , & method , this ] (
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2017-04-01 07:20:54 +00:00
AggregateColumnsData & aggregate_columns ,
2017-12-15 03:47:43 +00:00
MutableColumns & final_aggregate_columns ,
2018-08-27 18:16:32 +00:00
bool final_ )
2017-04-01 07:20:54 +00:00
{
convertToBlockImpl ( method , method . data . impls [ bucket ] ,
2018-08-27 18:16:32 +00:00
key_columns , aggregate_columns , final_aggregate_columns , final_ ) ;
2017-04-01 07:20:54 +00:00
} ) ;
block . info . bucket_num = bucket ;
return block ;
2015-12-06 14:27:09 +00:00
}
2019-09-06 12:19:59 +00:00
Block Aggregator : : mergeAndConvertOneBucketToBlock (
ManyAggregatedDataVariants & variants ,
Arena * arena ,
bool final ,
2020-01-10 20:24:59 +00:00
size_t bucket ,
std : : atomic < bool > * is_cancelled ) const
2019-09-06 12:19:59 +00:00
{
auto & merged_data = * variants [ 0 ] ;
auto method = merged_data . type ;
Block block ;
if ( false ) { }
# define M(NAME) \
else if ( method = = AggregatedDataVariants : : Type : : NAME ) \
{ \
mergeBucketImpl < decltype ( merged_data . NAME ) : : element_type > ( variants , bucket , arena ) ; \
2020-01-10 20:24:59 +00:00
if ( is_cancelled & & is_cancelled - > load ( std : : memory_order_seq_cst ) ) \
return { } ; \
2019-09-06 12:19:59 +00:00
block = convertOneBucketToBlock ( merged_data , * merged_data . NAME , final , bucket ) ; \
}
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
return block ;
}
2015-12-06 14:27:09 +00:00
2015-11-30 19:57:46 +00:00
template < typename Method >
void Aggregator : : writeToTemporaryFileImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
2017-12-01 21:13:25 +00:00
IBlockOutputStream & out )
2015-11-30 19:57:46 +00:00
{
2017-04-01 07:20:54 +00:00
size_t max_temporary_block_size_rows = 0 ;
size_t max_temporary_block_size_bytes = 0 ;
2015-11-30 19:57:46 +00:00
2017-07-25 13:09:52 +00:00
auto update_max_sizes = [ & ] ( const Block & block )
2017-04-01 07:20:54 +00:00
{
size_t block_size_rows = block . rows ( ) ;
size_t block_size_bytes = block . bytes ( ) ;
2015-12-01 14:09:05 +00:00
2017-04-01 07:20:54 +00:00
if ( block_size_rows > max_temporary_block_size_rows )
2017-07-25 13:09:52 +00:00
max_temporary_block_size_rows = block_size_rows ;
2017-04-01 07:20:54 +00:00
if ( block_size_bytes > max_temporary_block_size_bytes )
max_temporary_block_size_bytes = block_size_bytes ;
2017-07-25 13:09:52 +00:00
} ;
for ( size_t bucket = 0 ; bucket < Method : : Data : : NUM_BUCKETS ; + + bucket )
{
Block block = convertOneBucketToBlock ( data_variants , method , false , bucket ) ;
out . write ( block ) ;
update_max_sizes ( block ) ;
}
if ( params . overflow_row )
{
Block block = prepareBlockAndFillWithoutKey ( data_variants , false , true ) ;
out . write ( block ) ;
update_max_sizes ( block ) ;
2017-04-01 07:20:54 +00:00
}
2015-11-30 19:57:46 +00:00
2017-07-25 13:09:52 +00:00
/// Pass ownership of the aggregate functions states:
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
2017-04-01 07:20:54 +00:00
data_variants . aggregator = nullptr ;
2015-12-01 16:58:15 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Max size of temporary block: " < < max_temporary_block_size_rows < < " rows, "
< < ( max_temporary_block_size_bytes / 1048576.0 ) < < " MiB. " ) ;
2015-11-30 19:57:46 +00:00
}
2015-10-22 01:44:33 +00:00
bool Aggregator : : checkLimits ( size_t result_size , bool & no_more_keys ) const
{
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys & & params . max_rows_to_group_by & & result_size > params . max_rows_to_group_by )
{
2017-09-08 03:47:27 +00:00
switch ( params . group_by_overflow_mode )
{
case OverflowMode : : THROW :
throw Exception ( " Limit for rows to GROUP BY exceeded: has " + toString ( result_size )
+ " rows, maximum: " + toString ( params . max_rows_to_group_by ) ,
2018-03-09 23:23:15 +00:00
ErrorCodes : : TOO_MANY_ROWS ) ;
2017-09-08 03:47:27 +00:00
case OverflowMode : : BREAK :
return false ;
case OverflowMode : : ANY :
no_more_keys = true ;
break ;
}
2017-04-01 07:20:54 +00:00
}
return true ;
2014-05-10 05:16:23 +00:00
}
2017-09-08 02:29:47 +00:00
void Aggregator : : execute ( const BlockInputStreamPtr & stream , AggregatedDataVariants & result )
2011-09-19 01:42:16 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2017-12-13 01:27:53 +00:00
ColumnRawPtrs key_columns ( params . keys_size ) ;
2017-04-01 07:20:54 +00:00
AggregateColumns aggregate_columns ( params . aggregates_size ) ;
2017-04-02 17:37:49 +00:00
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode = = ANY .
* In this case , new keys are not added to the set , but aggregation is performed only by
* keys that have already managed to get into the set .
2017-04-01 07:20:54 +00:00
*/
bool no_more_keys = false ;
LOG_TRACE ( log , " Aggregating " ) ;
Stopwatch watch ;
size_t src_rows = 0 ;
size_t src_bytes = 0 ;
2017-04-02 17:37:49 +00:00
/// Read all the data
2017-04-01 07:20:54 +00:00
while ( Block block = stream - > read ( ) )
{
if ( isCancelled ( ) )
return ;
src_rows + = block . rows ( ) ;
src_bytes + = block . bytes ( ) ;
2019-08-11 21:45:18 +00:00
if ( ! executeOnBlock ( block , result , key_columns , aggregate_columns , no_more_keys ) )
2017-04-01 07:20:54 +00:00
break ;
}
2018-02-18 05:35:48 +00:00
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if ( result . empty ( ) & & params . keys_size = = 0 & & ! params . empty_result_for_aggregation_by_empty_set )
2019-08-11 21:45:18 +00:00
executeOnBlock ( stream - > getHeader ( ) , result , key_columns , aggregate_columns , no_more_keys ) ;
2018-02-18 05:35:48 +00:00
2017-04-01 07:20:54 +00:00
double elapsed_seconds = watch . elapsedSeconds ( ) ;
2018-02-16 20:53:47 +00:00
size_t rows = result . sizeWithoutOverflowRow ( ) ;
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Aggregated. " < < src_rows < < " to " < < rows < < " rows (from " < < src_bytes / 1048576.0 < < " MiB) "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < src_rows / elapsed_seconds < < " rows/sec., " < < src_bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void Aggregator : : convertToBlockImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2017-04-01 07:20:54 +00:00
AggregateColumnsData & aggregate_columns ,
2017-12-15 03:47:43 +00:00
MutableColumns & final_aggregate_columns ,
2017-04-01 07:20:54 +00:00
bool final ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
if ( data . empty ( ) )
return ;
2015-12-11 00:34:00 +00:00
2018-10-15 19:14:08 +00:00
if ( key_columns . size ( ) ! = params . keys_size )
throw Exception { " Aggregate. Unexpected key columns size. " , ErrorCodes : : LOGICAL_ERROR } ;
2017-04-01 07:20:54 +00:00
if ( final )
2018-08-27 18:05:28 +00:00
convertToBlockImplFinal ( method , data , key_columns , final_aggregate_columns ) ;
2017-04-01 07:20:54 +00:00
else
2018-08-27 18:05:28 +00:00
convertToBlockImplNotFinal ( method , data , key_columns , aggregate_columns ) ;
2017-04-02 17:37:49 +00:00
/// In order to release memory early.
2017-04-01 07:20:54 +00:00
data . clearAndShrink ( ) ;
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : convertToBlockImplFinal (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2018-08-27 18:05:28 +00:00
MutableColumns & final_aggregate_columns ) const
2015-01-03 05:39:21 +00:00
{
2018-12-03 13:00:01 +00:00
if constexpr ( Method : : low_cardinality_optimization )
{
if ( data . hasNullKeyData ( ) )
{
2019-02-08 16:54:04 +00:00
key_columns [ 0 ] - > insertDefault ( ) ;
2018-12-03 13:00:01 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > insertResultInto (
data . getNullKeyData ( ) + offsets_of_aggregate_states [ i ] ,
* final_aggregate_columns [ i ] ) ;
}
}
2019-08-08 16:16:19 +00:00
data . forEachValue ( [ & ] ( const auto & key , auto & mapped )
2017-04-01 07:20:54 +00:00
{
2019-08-08 16:16:19 +00:00
method . insertKeyIntoColumns ( key , key_columns , key_sizes ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > insertResultInto (
2019-08-08 16:16:19 +00:00
mapped + offsets_of_aggregate_states [ i ] ,
2017-04-01 07:20:54 +00:00
* final_aggregate_columns [ i ] ) ;
2019-08-08 16:16:19 +00:00
} ) ;
2015-12-06 19:42:28 +00:00
2019-02-08 16:54:04 +00:00
destroyImpl < Method > ( data ) ;
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : convertToBlockImplNotFinal (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2018-08-27 18:05:28 +00:00
AggregateColumnsData & aggregate_columns ) const
2015-01-03 05:39:21 +00:00
{
2018-12-03 13:00:01 +00:00
if constexpr ( Method : : low_cardinality_optimization )
{
if ( data . hasNullKeyData ( ) )
{
2019-02-08 16:54:04 +00:00
key_columns [ 0 ] - > insertDefault ( ) ;
2018-12-03 13:00:01 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] - > push_back ( data . getNullKeyData ( ) + offsets_of_aggregate_states [ i ] ) ;
}
}
2019-08-08 16:16:19 +00:00
data . forEachValue ( [ & ] ( const auto & key , auto & mapped )
2017-04-01 07:20:54 +00:00
{
2019-08-08 16:16:19 +00:00
method . insertKeyIntoColumns ( key , key_columns , key_sizes ) ;
2015-01-03 05:39:21 +00:00
2017-04-02 17:37:49 +00:00
/// reserved, so push_back does not throw exceptions
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2019-08-08 16:16:19 +00:00
aggregate_columns [ i ] - > push_back ( mapped + offsets_of_aggregate_states [ i ] ) ;
2015-12-23 07:29:20 +00:00
2019-08-08 16:16:19 +00:00
mapped = nullptr ;
} ) ;
2011-09-19 01:42:16 +00:00
}
2015-01-02 03:16:28 +00:00
template < typename Filler >
Block Aggregator : : prepareBlockAndFill (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
bool final ,
size_t rows ,
Filler & & filler ) const
2012-02-27 06:28:20 +00:00
{
2017-12-15 03:47:43 +00:00
MutableColumns key_columns ( params . keys_size ) ;
MutableColumns aggregate_columns ( params . aggregates_size ) ;
MutableColumns final_aggregate_columns ( params . aggregates_size ) ;
AggregateColumnsData aggregate_columns_data ( params . aggregates_size ) ;
2017-04-01 07:20:54 +00:00
2018-01-06 18:10:44 +00:00
Block header = getHeader ( final ) ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
{
2018-01-06 18:10:44 +00:00
key_columns [ i ] = header . safeGetByPosition ( i ) . type - > createColumn ( ) ;
2017-04-01 07:20:54 +00:00
key_columns [ i ] - > reserve ( rows ) ;
}
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
if ( ! final )
{
2018-01-06 18:10:44 +00:00
aggregate_columns [ i ] = header . safeGetByPosition ( i + params . keys_size ) . type - > createColumn ( ) ;
2017-12-15 03:47:43 +00:00
2017-04-02 17:37:49 +00:00
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
2019-08-21 02:28:04 +00:00
ColumnAggregateFunction & column_aggregate_func = assert_cast < ColumnAggregateFunction & > ( * aggregate_columns [ i ] ) ;
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < data_variants . aggregates_pools . size ( ) ; + + j )
column_aggregate_func . addArena ( data_variants . aggregates_pools [ j ] ) ;
2017-12-15 03:47:43 +00:00
aggregate_columns_data [ i ] = & column_aggregate_func . getData ( ) ;
aggregate_columns_data [ i ] - > reserve ( rows ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2017-12-15 03:47:43 +00:00
final_aggregate_columns [ i ] = aggregate_functions [ i ] - > getReturnType ( ) - > createColumn ( ) ;
final_aggregate_columns [ i ] - > reserve ( rows ) ;
2017-04-01 07:20:54 +00:00
if ( aggregate_functions [ i ] - > isState ( ) )
{
2017-04-02 17:37:49 +00:00
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
2019-08-21 02:28:04 +00:00
ColumnAggregateFunction & column_aggregate_func = assert_cast < ColumnAggregateFunction & > ( * final_aggregate_columns [ i ] ) ;
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < data_variants . aggregates_pools . size ( ) ; + + j )
column_aggregate_func . addArena ( data_variants . aggregates_pools [ j ] ) ;
}
}
}
2018-08-27 18:05:28 +00:00
filler ( key_columns , aggregate_columns_data , final_aggregate_columns , final ) ;
2017-12-15 03:47:43 +00:00
2018-01-06 18:10:44 +00:00
Block res = header . cloneEmpty ( ) ;
2017-12-15 03:47:43 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
res . getByPosition ( i ) . column = std : : move ( key_columns [ i ] ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2017-12-17 05:21:04 +00:00
{
if ( final )
res . getByPosition ( i + params . keys_size ) . column = std : : move ( final_aggregate_columns [ i ] ) ;
else
res . getByPosition ( i + params . keys_size ) . column = std : : move ( aggregate_columns [ i ] ) ;
}
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Change the size of the columns-constants in the block.
2018-01-06 18:10:44 +00:00
size_t columns = header . columns ( ) ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < columns ; + + i )
2019-06-27 19:28:52 +00:00
if ( isColumnConst ( * res . getByPosition ( i ) . column ) )
2017-12-15 03:47:43 +00:00
res . getByPosition ( i ) . column = res . getByPosition ( i ) . column - > cut ( 0 , rows ) ;
2017-04-01 07:20:54 +00:00
return res ;
2015-01-02 03:16:28 +00:00
}
2014-05-28 14:54:42 +00:00
2014-05-22 18:58:41 +00:00
2017-07-25 12:16:14 +00:00
Block Aggregator : : prepareBlockAndFillWithoutKey ( AggregatedDataVariants & data_variants , bool final , bool is_overflows ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
size_t rows = 1 ;
auto filler = [ & data_variants , this ] (
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2017-04-01 07:20:54 +00:00
AggregateColumnsData & aggregate_columns ,
2017-12-15 03:47:43 +00:00
MutableColumns & final_aggregate_columns ,
2018-08-27 18:16:32 +00:00
bool final_ )
2017-04-01 07:20:54 +00:00
{
if ( data_variants . type = = AggregatedDataVariants : : Type : : without_key | | params . overflow_row )
{
AggregatedDataWithoutKey & data = data_variants . without_key ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
2018-08-27 18:16:32 +00:00
if ( ! final_ )
2017-04-01 07:20:54 +00:00
aggregate_columns [ i ] - > push_back ( data + offsets_of_aggregate_states [ i ] ) ;
else
aggregate_functions [ i ] - > insertResultInto ( data + offsets_of_aggregate_states [ i ] , * final_aggregate_columns [ i ] ) ;
}
2018-08-27 18:16:32 +00:00
if ( ! final_ )
2017-04-01 07:20:54 +00:00
data = nullptr ;
if ( params . overflow_row )
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] - > insertDefault ( ) ;
}
} ;
Block block = prepareBlockAndFill ( data_variants , final , rows , filler ) ;
if ( is_overflows )
block . info . is_overflows = true ;
if ( final )
destroyWithoutKey ( data_variants ) ;
2017-07-25 12:16:14 +00:00
return block ;
2015-01-02 03:16:28 +00:00
}
2014-05-22 18:58:41 +00:00
2017-07-25 12:16:14 +00:00
Block Aggregator : : prepareBlockAndFillSingleLevel ( AggregatedDataVariants & data_variants , bool final ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
size_t rows = data_variants . sizeWithoutOverflowRow ( ) ;
auto filler = [ & data_variants , this ] (
2017-12-15 03:47:43 +00:00
MutableColumns & key_columns ,
2017-04-01 07:20:54 +00:00
AggregateColumnsData & aggregate_columns ,
2017-12-15 03:47:43 +00:00
MutableColumns & final_aggregate_columns ,
2018-08-27 18:16:32 +00:00
bool final_ )
2017-04-01 07:20:54 +00:00
{
# define M(NAME) \
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
convertToBlockImpl ( * data_variants . NAME , data_variants . NAME - > data , \
2018-08-27 18:16:32 +00:00
key_columns , aggregate_columns , final_aggregate_columns , final_ ) ;
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_SINGLE_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
} ;
2017-07-25 12:16:14 +00:00
return prepareBlockAndFill ( data_variants , final , rows , filler ) ;
2015-01-02 03:16:28 +00:00
}
2016-08-02 01:46:05 +00:00
BlocksList Aggregator : : prepareBlocksAndFillTwoLevel ( AggregatedDataVariants & data_variants , bool final , ThreadPool * thread_pool ) const
2015-01-02 03:16:28 +00:00
{
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
return prepareBlocksAndFillTwoLevelImpl ( data_variants , * data_variants . NAME , final , thread_pool ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-01-02 03:16:28 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-01-02 03:16:28 +00:00
}
template < typename Method >
BlocksList Aggregator : : prepareBlocksAndFillTwoLevelImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
bool final ,
ThreadPool * thread_pool ) const
2015-01-02 03:16:28 +00:00
{
2018-06-19 20:30:35 +00:00
auto converter = [ & ] ( size_t bucket , ThreadGroupStatusPtr thread_group )
2017-04-01 07:20:54 +00:00
{
2018-11-20 17:08:34 +00:00
if ( thread_group )
CurrentThread : : attachToIfDetached ( thread_group ) ;
2017-04-01 07:20:54 +00:00
return convertOneBucketToBlock ( data_variants , method , final , bucket ) ;
} ;
2017-04-02 17:37:49 +00:00
/// packaged_task is used to ensure that exceptions are automatically thrown into the main stream.
2017-04-01 07:20:54 +00:00
std : : vector < std : : packaged_task < Block ( ) > > tasks ( Method : : Data : : NUM_BUCKETS ) ;
try
{
for ( size_t bucket = 0 ; bucket < Method : : Data : : NUM_BUCKETS ; + + bucket )
{
if ( method . data . impls [ bucket ] . empty ( ) )
continue ;
2018-06-19 20:30:35 +00:00
tasks [ bucket ] = std : : packaged_task < Block ( ) > ( std : : bind ( converter , bucket , CurrentThread : : getGroup ( ) ) ) ;
2017-04-01 07:20:54 +00:00
if ( thread_pool )
2019-10-17 14:41:27 +00:00
thread_pool - > scheduleOrThrowOnError ( [ bucket , & tasks ] { tasks [ bucket ] ( ) ; } ) ;
2017-04-01 07:20:54 +00:00
else
tasks [ bucket ] ( ) ;
}
}
catch ( . . . )
{
2017-04-02 17:37:49 +00:00
/// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad.
2017-04-01 07:20:54 +00:00
if ( thread_pool )
thread_pool - > wait ( ) ;
throw ;
}
if ( thread_pool )
thread_pool - > wait ( ) ;
BlocksList blocks ;
for ( auto & task : tasks )
{
if ( ! task . valid ( ) )
continue ;
blocks . emplace_back ( task . get_future ( ) . get ( ) ) ;
}
return blocks ;
2015-01-02 03:16:28 +00:00
}
2015-12-08 02:01:46 +00:00
BlocksList Aggregator : : convertToBlocks ( AggregatedDataVariants & data_variants , bool final , size_t max_threads ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Converting aggregated data to blocks " ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
Stopwatch watch ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
BlocksList blocks ;
2015-01-02 03:16:28 +00:00
2017-04-02 17:37:49 +00:00
/// In what data structure is the data aggregated?
2017-04-01 07:20:54 +00:00
if ( data_variants . empty ( ) )
return blocks ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
std : : unique_ptr < ThreadPool > thread_pool ;
2017-04-02 17:37:49 +00:00
if ( max_threads > 1 & & data_variants . sizeWithoutOverflowRow ( ) > 100000 /// TODO Make a custom threshold.
& & data_variants . isTwoLevel ( ) ) /// TODO Use the shared thread pool with the `merge` function.
2017-04-01 07:20:54 +00:00
thread_pool = std : : make_unique < ThreadPool > ( max_threads ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-07-25 12:16:14 +00:00
if ( data_variants . without_key )
blocks . emplace_back ( prepareBlockAndFillWithoutKey (
2017-04-01 07:20:54 +00:00
data_variants , final , data_variants . type ! = AggregatedDataVariants : : Type : : without_key ) ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
if ( data_variants . type ! = AggregatedDataVariants : : Type : : without_key )
{
if ( ! data_variants . isTwoLevel ( ) )
2017-07-25 12:16:14 +00:00
blocks . emplace_back ( prepareBlockAndFillSingleLevel ( data_variants , final ) ) ;
2017-04-01 07:20:54 +00:00
else
blocks . splice ( blocks . end ( ) , prepareBlocksAndFillTwoLevel ( data_variants , final , thread_pool . get ( ) ) ) ;
}
2014-02-26 11:44:54 +00:00
2017-04-01 07:20:54 +00:00
if ( ! final )
{
2017-04-02 17:37:49 +00:00
/// data_variants will not destroy the states of aggregate functions in the destructor.
/// Now ColumnAggregateFunction owns the states.
2017-04-01 07:20:54 +00:00
data_variants . aggregator = nullptr ;
}
2013-02-16 18:59:05 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
size_t rows = 0 ;
size_t bytes = 0 ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
for ( const auto & block : blocks )
{
rows + = block . rows ( ) ;
bytes + = block . bytes ( ) ;
}
2012-02-27 06:28:20 +00:00
2017-04-01 07:20:54 +00:00
double elapsed_seconds = watch . elapsedSeconds ( ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Converted aggregated data to blocks. "
< < rows < < " rows, " < < bytes / 1048576.0 < < " MiB "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < rows / elapsed_seconds < < " rows/sec., " < < bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2012-05-31 00:33:42 +00:00
2017-04-01 07:20:54 +00:00
return blocks ;
2012-02-27 06:28:20 +00:00
}
2018-12-04 10:31:15 +00:00
template < typename Method , typename Table >
2018-12-04 10:27:44 +00:00
void NO_INLINE Aggregator : : mergeDataNullKey (
Table & table_dst ,
Table & table_src ,
Arena * arena ) const
{
if constexpr ( Method : : low_cardinality_optimization )
{
if ( table_src . hasNullKeyData ( ) )
{
if ( ! table_dst . hasNullKeyData ( ) )
{
table_dst . hasNullKeyData ( ) = true ;
table_dst . getNullKeyData ( ) = table_src . getNullKeyData ( ) ;
}
else
{
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
table_dst . getNullKeyData ( ) + offsets_of_aggregate_states [ i ] ,
table_src . getNullKeyData ( ) + offsets_of_aggregate_states [ i ] ,
arena ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy (
table_src . getNullKeyData ( ) + offsets_of_aggregate_states [ i ] ) ;
}
table_src . hasNullKeyData ( ) = false ;
table_src . getNullKeyData ( ) = nullptr ;
}
}
}
2015-01-03 05:39:21 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
Table & table_src ,
Arena * arena ) const
2015-01-03 05:39:21 +00:00
{
2018-12-04 10:27:44 +00:00
if constexpr ( Method : : low_cardinality_optimization )
2018-12-04 10:31:15 +00:00
mergeDataNullKey < Method , Table > ( table_dst , table_src , arena ) ;
2018-12-04 10:27:44 +00:00
2019-08-08 16:16:19 +00:00
table_src . mergeToViaEmplace ( table_dst ,
[ & ] ( AggregateDataPtr & dst , AggregateDataPtr & src , bool inserted )
2017-04-01 07:20:54 +00:00
{
if ( ! inserted )
{
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
2019-08-08 16:16:19 +00:00
dst + offsets_of_aggregate_states [ i ] ,
src + offsets_of_aggregate_states [ i ] ,
2017-04-01 07:20:54 +00:00
arena ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2019-08-08 16:16:19 +00:00
aggregate_functions [ i ] - > destroy ( src + offsets_of_aggregate_states [ i ] ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2019-08-08 16:16:19 +00:00
dst = src ;
2017-04-01 07:20:54 +00:00
}
2019-08-08 16:16:19 +00:00
src = nullptr ;
} ) ;
2017-04-01 07:20:54 +00:00
table_src . clearAndShrink ( ) ;
2015-01-03 05:39:21 +00:00
}
2015-10-22 01:44:33 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataNoMoreKeysImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
AggregatedDataWithoutKey & overflows ,
Table & table_src ,
Arena * arena ) const
2015-10-22 01:44:33 +00:00
{
2018-12-04 10:27:44 +00:00
/// Note : will create data for NULL key if not exist
if constexpr ( Method : : low_cardinality_optimization )
2018-12-04 10:31:15 +00:00
mergeDataNullKey < Method , Table > ( table_dst , table_src , arena ) ;
2018-12-04 10:27:44 +00:00
2019-08-08 16:16:19 +00:00
table_src . mergeToViaFind ( table_dst , [ & ] ( AggregateDataPtr dst , AggregateDataPtr & src , bool found )
2017-04-01 07:20:54 +00:00
{
2019-08-08 16:16:19 +00:00
AggregateDataPtr res_data = found ? dst : overflows ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
res_data + offsets_of_aggregate_states [ i ] ,
2019-08-08 16:16:19 +00:00
src + offsets_of_aggregate_states [ i ] ,
2017-04-01 07:20:54 +00:00
arena ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2019-08-08 16:16:19 +00:00
aggregate_functions [ i ] - > destroy ( src + offsets_of_aggregate_states [ i ] ) ;
2015-12-23 07:06:34 +00:00
2019-08-08 16:16:19 +00:00
src = nullptr ;
} ) ;
2017-04-01 07:20:54 +00:00
table_src . clearAndShrink ( ) ;
2015-10-22 01:44:33 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataOnlyExistingKeysImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
Table & table_src ,
Arena * arena ) const
2015-10-22 01:44:33 +00:00
{
2018-12-04 10:27:44 +00:00
/// Note : will create data for NULL key if not exist
if constexpr ( Method : : low_cardinality_optimization )
2018-12-04 10:31:15 +00:00
mergeDataNullKey < Method , Table > ( table_dst , table_src , arena ) ;
2018-12-04 10:27:44 +00:00
2019-08-08 16:16:19 +00:00
table_src . mergeToViaFind ( table_dst ,
[ & ] ( AggregateDataPtr dst , AggregateDataPtr & src , bool found )
2017-04-01 07:20:54 +00:00
{
2019-08-08 16:16:19 +00:00
if ( ! found )
return ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
2019-08-08 16:16:19 +00:00
dst + offsets_of_aggregate_states [ i ] ,
src + offsets_of_aggregate_states [ i ] ,
2017-04-01 07:20:54 +00:00
arena ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2019-08-08 16:16:19 +00:00
aggregate_functions [ i ] - > destroy ( src + offsets_of_aggregate_states [ i ] ) ;
2015-12-23 07:06:34 +00:00
2019-08-08 16:16:19 +00:00
src = nullptr ;
} ) ;
2017-04-01 07:20:54 +00:00
table_src . clearAndShrink ( ) ;
2015-10-22 01:44:33 +00:00
}
2015-01-03 05:39:21 +00:00
void NO_INLINE Aggregator : : mergeWithoutKeyDataImpl (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & non_empty_data ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = non_empty_data [ 0 ] ;
2015-01-03 05:39:21 +00:00
2018-08-27 18:16:32 +00:00
/// We merge all aggregation results to the first.
for ( size_t result_num = 1 , size = non_empty_data . size ( ) ; result_num < size ; + + result_num )
2017-04-01 07:20:54 +00:00
{
AggregatedDataWithoutKey & res_data = res - > without_key ;
2018-08-27 18:16:32 +00:00
AggregatedDataWithoutKey & current_data = non_empty_data [ result_num ] - > without_key ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge ( res_data + offsets_of_aggregate_states [ i ] , current_data + offsets_of_aggregate_states [ i ] , res - > aggregates_pool ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy ( current_data + offsets_of_aggregate_states [ i ] ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
current_data = nullptr ;
}
2015-01-03 05:39:21 +00:00
}
template < typename Method >
void NO_INLINE Aggregator : : mergeSingleLevelDataImpl (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & non_empty_data ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = non_empty_data [ 0 ] ;
bool no_more_keys = false ;
2018-08-27 18:16:32 +00:00
/// We merge all aggregation results to the first.
for ( size_t result_num = 1 , size = non_empty_data . size ( ) ; result_num < size ; + + result_num )
2017-04-01 07:20:54 +00:00
{
if ( ! checkLimits ( res - > sizeWithoutOverflowRow ( ) , no_more_keys ) )
break ;
2018-08-27 18:16:32 +00:00
AggregatedDataVariants & current = * non_empty_data [ result_num ] ;
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
mergeDataImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
else if ( res - > without_key )
mergeDataNoMoreKeysImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
res - > without_key ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
else
mergeDataOnlyExistingKeysImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
2017-04-02 17:37:49 +00:00
/// `current` will not destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
current . aggregator = nullptr ;
}
2015-01-03 05:39:21 +00:00
}
2015-12-06 16:22:01 +00:00
template < typename Method >
void NO_INLINE Aggregator : : mergeBucketImpl (
2020-01-10 20:24:59 +00:00
ManyAggregatedDataVariants & data , Int32 bucket , Arena * arena , std : : atomic < bool > * is_cancelled ) const
2015-12-06 16:22:01 +00:00
{
2018-08-27 18:16:32 +00:00
/// We merge all aggregation results to the first.
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = data [ 0 ] ;
2018-08-27 18:16:32 +00:00
for ( size_t result_num = 1 , size = data . size ( ) ; result_num < size ; + + result_num )
2017-04-01 07:20:54 +00:00
{
2020-01-10 20:24:59 +00:00
if ( is_cancelled & & is_cancelled - > load ( std : : memory_order_seq_cst ) )
return ;
2018-08-27 18:16:32 +00:00
AggregatedDataVariants & current = * data [ result_num ] ;
2017-04-01 07:20:54 +00:00
mergeDataImpl < Method > (
getDataVariant < Method > ( * res ) . data . impls [ bucket ] ,
getDataVariant < Method > ( current ) . data . impls [ bucket ] ,
arena ) ;
}
2015-12-06 16:22:01 +00:00
}
2017-04-02 17:37:49 +00:00
/** Combines aggregation states together, turns them into blocks, and outputs streams.
2017-09-09 02:12:53 +00:00
* If the aggregation states are two - level , then it produces blocks strictly in order of ' bucket_num ' .
2017-04-02 17:37:49 +00:00
* ( This is important for distributed processing . )
* In doing so , it can handle different buckets in parallel , using up to ` threads ` threads .
2015-12-06 16:22:01 +00:00
*/
2019-01-23 14:48:50 +00:00
class MergingAndConvertingBlockInputStream : public IBlockInputStream
2015-12-06 16:22:01 +00:00
{
public :
2017-04-02 17:37:49 +00:00
/** The input is a set of non-empty sets of partially aggregated data,
* which are all either single - level , or are two - level .
2017-04-01 07:20:54 +00:00
*/
MergingAndConvertingBlockInputStream ( const Aggregator & aggregator_ , ManyAggregatedDataVariants & data_ , bool final_ , size_t threads_ )
: aggregator ( aggregator_ ) , data ( data_ ) , final ( final_ ) , threads ( threads_ )
{
/// At least we need one arena in first data item per thread
if ( ! data . empty ( ) & & threads > data [ 0 ] - > aggregates_pools . size ( ) )
{
Arenas & first_pool = data [ 0 ] - > aggregates_pools ;
for ( size_t j = first_pool . size ( ) ; j < threads ; j + + )
first_pool . emplace_back ( std : : make_shared < Arena > ( ) ) ;
}
}
String getName ( ) const override { return " MergingAndConverting " ; }
2018-02-18 03:23:48 +00:00
Block getHeader ( ) const override { return aggregator . getHeader ( final ) ; }
2018-01-06 18:10:44 +00:00
2018-08-10 04:02:56 +00:00
~ MergingAndConvertingBlockInputStream ( ) override
2017-09-09 02:12:53 +00:00
{
LOG_TRACE ( & Logger : : get ( __PRETTY_FUNCTION__ ) , " Waiting for threads to finish " ) ;
/// We need to wait for threads to finish before destructor of 'parallel_merge_data',
/// because the threads access 'parallel_merge_data'.
if ( parallel_merge_data )
parallel_merge_data - > pool . wait ( ) ;
}
2015-12-06 16:22:01 +00:00
protected :
2017-04-01 07:20:54 +00:00
Block readImpl ( ) override
{
if ( data . empty ( ) )
return { } ;
if ( current_bucket_num > = NUM_BUCKETS )
return { } ;
AggregatedDataVariantsPtr & first = data [ 0 ] ;
if ( current_bucket_num = = - 1 )
{
+ + current_bucket_num ;
if ( first - > type = = AggregatedDataVariants : : Type : : without_key | | aggregator . params . overflow_row )
{
aggregator . mergeWithoutKeyDataImpl ( data ) ;
2017-07-25 12:16:14 +00:00
return aggregator . prepareBlockAndFillWithoutKey (
* first , final , first - > type ! = AggregatedDataVariants : : Type : : without_key ) ;
2017-04-01 07:20:54 +00:00
}
}
if ( ! first - > isTwoLevel ( ) )
{
if ( current_bucket_num > 0 )
return { } ;
if ( first - > type = = AggregatedDataVariants : : Type : : without_key )
return { } ;
+ + current_bucket_num ;
# define M(NAME) \
else if ( first - > type = = AggregatedDataVariants : : Type : : NAME ) \
aggregator . mergeSingleLevelDataImpl < decltype ( first - > NAME ) : : element_type > ( data ) ;
if ( false ) { }
APPLY_FOR_VARIANTS_SINGLE_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2017-07-25 12:16:14 +00:00
return aggregator . prepareBlockAndFillSingleLevel ( * first , final ) ;
2017-04-01 07:20:54 +00:00
}
else
{
if ( ! parallel_merge_data )
{
parallel_merge_data = std : : make_unique < ParallelMergeData > ( threads ) ;
for ( size_t i = 0 ; i < threads ; + + i )
scheduleThreadForNextBucket ( ) ;
}
Block res ;
while ( true )
{
2019-01-02 06:44:36 +00:00
std : : unique_lock lock ( parallel_merge_data - > mutex ) ;
2017-04-01 07:20:54 +00:00
if ( parallel_merge_data - > exception )
std : : rethrow_exception ( parallel_merge_data - > exception ) ;
auto it = parallel_merge_data - > ready_blocks . find ( current_bucket_num ) ;
if ( it ! = parallel_merge_data - > ready_blocks . end ( ) )
{
+ + current_bucket_num ;
scheduleThreadForNextBucket ( ) ;
if ( it - > second )
{
res . swap ( it - > second ) ;
break ;
}
else if ( current_bucket_num > = NUM_BUCKETS )
break ;
}
parallel_merge_data - > condvar . wait ( lock ) ;
}
return res ;
}
}
2015-12-06 16:22:01 +00:00
private :
2017-04-01 07:20:54 +00:00
const Aggregator & aggregator ;
ManyAggregatedDataVariants data ;
bool final ;
size_t threads ;
Int32 current_bucket_num = - 1 ;
Int32 max_scheduled_bucket_num = - 1 ;
static constexpr Int32 NUM_BUCKETS = 256 ;
struct ParallelMergeData
{
std : : map < Int32 , Block > ready_blocks ;
std : : exception_ptr exception ;
std : : mutex mutex ;
std : : condition_variable condvar ;
2017-09-09 02:12:53 +00:00
ThreadPool pool ;
2017-04-01 07:20:54 +00:00
2019-08-03 11:02:40 +00:00
explicit ParallelMergeData ( size_t threads_ ) : pool ( threads_ ) { }
2017-04-01 07:20:54 +00:00
} ;
std : : unique_ptr < ParallelMergeData > parallel_merge_data ;
void scheduleThreadForNextBucket ( )
{
+ + max_scheduled_bucket_num ;
if ( max_scheduled_bucket_num > = NUM_BUCKETS )
return ;
2019-10-17 14:41:27 +00:00
parallel_merge_data - > pool . scheduleOrThrowOnError ( std : : bind ( & MergingAndConvertingBlockInputStream : : thread , this ,
2018-06-19 20:30:35 +00:00
max_scheduled_bucket_num , CurrentThread : : getGroup ( ) ) ) ;
2017-04-01 07:20:54 +00:00
}
2018-06-19 20:30:35 +00:00
void thread ( Int32 bucket_num , ThreadGroupStatusPtr thread_group )
2017-04-01 07:20:54 +00:00
{
try
{
2018-06-08 19:50:15 +00:00
setThreadName ( " MergingAggregtd " ) ;
2018-11-20 17:08:34 +00:00
if ( thread_group )
CurrentThread : : attachToIfDetached ( thread_group ) ;
2018-06-08 19:50:15 +00:00
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : QueryThread } ;
2017-04-01 07:20:54 +00:00
/// TODO: add no_more_keys support maybe
auto & merged_data = * data [ 0 ] ;
auto method = merged_data . type ;
Block block ;
/// Select Arena to avoid race conditions
size_t thread_number = static_cast < size_t > ( bucket_num ) % threads ;
Arena * arena = merged_data . aggregates_pools . at ( thread_number ) . get ( ) ;
if ( false ) { }
# define M(NAME) \
else if ( method = = AggregatedDataVariants : : Type : : NAME ) \
{ \
aggregator . mergeBucketImpl < decltype ( merged_data . NAME ) : : element_type > ( data , bucket_num , arena ) ; \
block = aggregator . convertOneBucketToBlock ( merged_data , * merged_data . NAME , final , bucket_num ) ; \
}
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
2019-01-02 06:44:36 +00:00
std : : lock_guard lock ( parallel_merge_data - > mutex ) ;
2017-04-01 07:20:54 +00:00
parallel_merge_data - > ready_blocks [ bucket_num ] = std : : move ( block ) ;
}
catch ( . . . )
{
2019-01-02 06:44:36 +00:00
std : : lock_guard lock ( parallel_merge_data - > mutex ) ;
2017-04-01 07:20:54 +00:00
if ( ! parallel_merge_data - > exception )
parallel_merge_data - > exception = std : : current_exception ( ) ;
}
parallel_merge_data - > condvar . notify_all ( ) ;
}
2015-12-06 16:22:01 +00:00
} ;
2019-09-06 12:19:59 +00:00
ManyAggregatedDataVariants Aggregator : : prepareVariantsToMerge ( ManyAggregatedDataVariants & data_variants ) const
2015-12-06 16:22:01 +00:00
{
2017-04-01 07:20:54 +00:00
if ( data_variants . empty ( ) )
throw Exception ( " Empty data passed to Aggregator::mergeAndConvertToBlocks. " , ErrorCodes : : EMPTY_DATA_PASSED ) ;
LOG_TRACE ( log , " Merging aggregated data " ) ;
ManyAggregatedDataVariants non_empty_data ;
non_empty_data . reserve ( data_variants . size ( ) ) ;
for ( auto & data : data_variants )
if ( ! data - > empty ( ) )
non_empty_data . push_back ( data ) ;
if ( non_empty_data . empty ( ) )
2019-09-06 12:19:59 +00:00
return { } ;
2017-04-01 07:20:54 +00:00
if ( non_empty_data . size ( ) > 1 )
{
2017-04-02 17:37:49 +00:00
/// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first).
2017-04-01 07:20:54 +00:00
std : : sort ( non_empty_data . begin ( ) , non_empty_data . end ( ) ,
[ ] ( const AggregatedDataVariantsPtr & lhs , const AggregatedDataVariantsPtr & rhs )
{
return lhs - > sizeWithoutOverflowRow ( ) > rhs - > sizeWithoutOverflowRow ( ) ;
} ) ;
}
2017-04-02 17:37:49 +00:00
/// If at least one of the options is two-level, then convert all the options into two-level ones, if there are not such.
/// Note - perhaps it would be more optimal not to convert single-level versions before the merge, but merge them separately, at the end.
2017-04-01 07:20:54 +00:00
bool has_at_least_one_two_level = false ;
for ( const auto & variant : non_empty_data )
{
if ( variant - > isTwoLevel ( ) )
{
has_at_least_one_two_level = true ;
break ;
}
}
if ( has_at_least_one_two_level )
for ( auto & variant : non_empty_data )
if ( ! variant - > isTwoLevel ( ) )
2018-09-12 13:27:00 +00:00
variant - > convertToTwoLevel ( ) ;
2018-08-24 15:45:17 +00:00
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & first = non_empty_data [ 0 ] ;
for ( size_t i = 1 , size = non_empty_data . size ( ) ; i < size ; + + i )
{
if ( first - > type ! = non_empty_data [ i ] - > type )
throw Exception ( " Cannot merge different aggregated data variants. " , ErrorCodes : : CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS ) ;
2017-04-02 17:37:49 +00:00
/** Elements from the remaining sets can be moved to the first data set.
* Therefore , it must own all the arenas of all other sets .
2017-04-01 07:20:54 +00:00
*/
first - > aggregates_pools . insert ( first - > aggregates_pools . end ( ) ,
non_empty_data [ i ] - > aggregates_pools . begin ( ) , non_empty_data [ i ] - > aggregates_pools . end ( ) ) ;
}
2019-09-06 12:19:59 +00:00
return non_empty_data ;
}
std : : unique_ptr < IBlockInputStream > Aggregator : : mergeAndConvertToBlocks (
ManyAggregatedDataVariants & data_variants , bool final , size_t max_threads ) const
{
ManyAggregatedDataVariants non_empty_data = prepareVariantsToMerge ( data_variants ) ;
if ( non_empty_data . empty ( ) )
return std : : make_unique < NullBlockInputStream > ( getHeader ( final ) ) ;
2017-04-01 07:20:54 +00:00
return std : : make_unique < MergingAndConvertingBlockInputStream > ( * this , non_empty_data , final , max_threads ) ;
2015-12-06 16:22:01 +00:00
}
2015-10-23 01:43:42 +00:00
template < bool no_more_keys , typename Method , typename Table >
void NO_INLINE Aggregator : : mergeStreamsImplCase (
2017-04-01 07:20:54 +00:00
Block & block ,
Arena * aggregates_pool ,
2019-01-21 10:39:24 +00:00
Method & method [[maybe_unused]] ,
2017-04-01 07:20:54 +00:00
Table & data ,
AggregateDataPtr overflow_row ) const
2015-01-03 05:39:21 +00:00
{
2017-12-13 01:27:53 +00:00
ColumnRawPtrs key_columns ( params . keys_size ) ;
2017-12-15 03:47:43 +00:00
AggregateColumnsConstData aggregate_columns ( params . aggregates_size ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = block . safeGetByPosition ( i ) . column . get ( ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2017-12-15 03:47:43 +00:00
aggregate_columns [ i ] = & typeid_cast < const ColumnAggregateFunction & > ( * block . safeGetByPosition ( params . keys_size + i ) . column ) . getData ( ) ;
2017-04-01 07:20:54 +00:00
2019-01-21 10:39:24 +00:00
typename Method : : State state ( key_columns , key_sizes , aggregation_state_cache ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// For all rows.
2017-04-01 07:20:54 +00:00
size_t rows = block . rows ( ) ;
for ( size_t i = 0 ; i < rows ; + + i )
{
2019-01-21 10:39:53 +00:00
AggregateDataPtr aggregate_data = nullptr ;
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
2019-01-21 10:39:53 +00:00
{
auto emplace_result = state . emplaceKey ( data , i , * aggregates_pool ) ;
if ( emplace_result . isInserted ( ) )
{
emplace_result . setMapped ( nullptr ) ;
aggregate_data = aggregates_pool - > alignedAlloc ( total_size_of_aggregate_states , align_aggregate_states ) ;
createAggregateStates ( aggregate_data ) ;
emplace_result . setMapped ( aggregate_data ) ;
}
else
aggregate_data = emplace_result . getMapped ( ) ;
}
2017-04-01 07:20:54 +00:00
else
{
2019-01-21 10:39:53 +00:00
auto find_result = state . findKey ( data , i , * aggregates_pool ) ;
if ( find_result . isFound ( ) )
aggregate_data = find_result . getMapped ( ) ;
2017-04-01 07:20:54 +00:00
}
2018-08-23 13:22:03 +00:00
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
2017-04-02 17:37:49 +00:00
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
2018-08-23 13:22:03 +00:00
if ( ! aggregate_data & & ! overflow_row )
2017-04-01 07:20:54 +00:00
continue ;
2019-01-21 10:39:53 +00:00
AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Merge state of aggregate functions.
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < params . aggregates_size ; + + j )
aggregate_functions [ j ] - > merge (
value + offsets_of_aggregate_states [ j ] ,
( * aggregate_columns [ j ] ) [ i ] ,
aggregates_pool ) ;
}
2017-04-02 17:37:49 +00:00
/// Early release memory.
2017-04-01 07:20:54 +00:00
block . clear ( ) ;
2015-01-03 05:39:21 +00:00
}
2015-10-23 01:43:42 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeStreamsImpl (
2017-04-01 07:20:54 +00:00
Block & block ,
Arena * aggregates_pool ,
Method & method ,
Table & data ,
AggregateDataPtr overflow_row ,
bool no_more_keys ) const
2015-10-23 01:43:42 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
2018-08-27 18:05:28 +00:00
mergeStreamsImplCase < false > ( block , aggregates_pool , method , data , overflow_row ) ;
2017-04-01 07:20:54 +00:00
else
2018-08-27 18:05:28 +00:00
mergeStreamsImplCase < true > ( block , aggregates_pool , method , data , overflow_row ) ;
2015-10-23 01:43:42 +00:00
}
2015-01-03 05:39:21 +00:00
void NO_INLINE Aggregator : : mergeWithoutKeyStreamsImpl (
2017-04-01 07:20:54 +00:00
Block & block ,
AggregatedDataVariants & result ) const
2015-01-03 05:39:21 +00:00
{
2017-12-15 03:47:43 +00:00
AggregateColumnsConstData aggregate_columns ( params . aggregates_size ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
2017-12-15 03:47:43 +00:00
aggregate_columns [ i ] = & typeid_cast < const ColumnAggregateFunction & > ( * block . safeGetByPosition ( params . keys_size + i ) . column ) . getData ( ) ;
2017-04-01 07:20:54 +00:00
AggregatedDataWithoutKey & res = result . without_key ;
if ( ! res )
{
2018-09-01 03:17:43 +00:00
AggregateDataPtr place = result . aggregates_pool - > alignedAlloc ( total_size_of_aggregate_states , align_aggregate_states ) ;
2017-04-01 07:20:54 +00:00
createAggregateStates ( place ) ;
res = place ;
}
2017-04-02 17:37:49 +00:00
/// Adding Values
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge ( res + offsets_of_aggregate_states [ i ] , ( * aggregate_columns [ i ] ) [ 0 ] , result . aggregates_pool ) ;
2017-04-02 17:37:49 +00:00
/// Early release memory.
2017-04-01 07:20:54 +00:00
block . clear ( ) ;
2015-01-03 05:39:21 +00:00
}
2017-09-08 02:29:47 +00:00
void Aggregator : : mergeStream ( const BlockInputStreamPtr & stream , AggregatedDataVariants & result , size_t max_threads )
2012-05-30 01:38:02 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-02 17:37:49 +00:00
/** If the remote servers used a two-level aggregation method,
* then blocks will contain information about the number of the bucket .
* Then the calculations can be parallelized by buckets .
* We decompose the blocks to the bucket numbers indicated in them .
2017-04-01 07:20:54 +00:00
*/
BucketToBlocks bucket_to_blocks ;
2015-01-03 03:18:49 +00:00
2017-04-02 17:37:49 +00:00
/// Read all the data.
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Reading blocks of partially aggregated data. " ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
size_t total_input_rows = 0 ;
size_t total_input_blocks = 0 ;
while ( Block block = stream - > read ( ) )
{
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
total_input_rows + = block . rows ( ) ;
+ + total_input_blocks ;
bucket_to_blocks [ block . info . bucket_num ] . emplace_back ( std : : move ( block ) ) ;
}
2015-01-03 05:39:21 +00:00
2019-03-04 16:06:28 +00:00
LOG_TRACE ( log , " Read " < < total_input_blocks < < " blocks of partially aggregated data, total " < < total_input_rows
< < " rows. " ) ;
2015-01-03 05:39:21 +00:00
2019-03-04 16:06:28 +00:00
mergeBlocks ( bucket_to_blocks , result , max_threads ) ;
}
void Aggregator : : mergeBlocks ( BucketToBlocks bucket_to_blocks , AggregatedDataVariants & result , size_t max_threads )
{
2017-04-01 07:20:54 +00:00
if ( bucket_to_blocks . empty ( ) )
return ;
2015-01-03 05:39:21 +00:00
2019-03-15 17:06:32 +00:00
UInt64 total_input_rows = 0 ;
2019-03-04 16:06:28 +00:00
for ( auto & bucket : bucket_to_blocks )
for ( auto & block : bucket . second )
2019-03-15 17:06:32 +00:00
total_input_rows + = block . rows ( ) ;
2019-03-04 16:06:28 +00:00
2017-04-02 17:37:49 +00:00
/** `minus one` means the absence of information about the bucket
* - in the case of single - level aggregation , as well as for blocks with " overflowing " values .
2018-09-28 20:17:38 +00:00
* If there is at least one block with a bucket number greater or equal than zero , then there was a two - level aggregation .
2017-04-01 07:20:54 +00:00
*/
auto max_bucket = bucket_to_blocks . rbegin ( ) - > first ;
2019-01-21 10:39:24 +00:00
bool has_two_level = max_bucket > = 0 ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
if ( has_two_level )
{
# define M(NAME) \
2018-08-27 17:42:13 +00:00
if ( method_chosen = = AggregatedDataVariants : : Type : : NAME ) \
method_chosen = AggregatedDataVariants : : Type : : NAME # # _two_level ;
2014-06-26 00:58:14 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2014-06-26 00:58:14 +00:00
2017-04-01 07:20:54 +00:00
# undef M
}
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-02 17:37:49 +00:00
/// result will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
2015-03-23 01:10:06 +00:00
2018-08-27 17:42:13 +00:00
result . init ( method_chosen ) ;
2017-04-01 07:20:54 +00:00
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
bool has_blocks_with_unknown_bucket = bucket_to_blocks . count ( - 1 ) ;
2015-01-17 04:49:13 +00:00
2017-04-02 17:37:49 +00:00
/// First, parallel the merge for the individual buckets. Then we continue merge the data not allocated to the buckets.
2017-04-01 07:20:54 +00:00
if ( has_two_level )
{
2017-04-02 17:37:49 +00:00
/** In this case, no_more_keys is not supported due to the fact that
* from different threads it is difficult to update the general state for " other " keys ( overflows ) .
* That is , the keys in the end can be significantly larger than max_rows_to_group_by .
2017-04-01 07:20:54 +00:00
*/
2015-10-23 01:43:42 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Merging partially aggregated two-level data. " ) ;
2012-05-30 01:38:02 +00:00
2018-06-19 20:30:35 +00:00
auto merge_bucket = [ & bucket_to_blocks , & result , this ] ( Int32 bucket , Arena * aggregates_pool , ThreadGroupStatusPtr thread_group )
2017-04-01 07:20:54 +00:00
{
2018-11-20 17:08:34 +00:00
if ( thread_group )
CurrentThread : : attachToIfDetached ( thread_group ) ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
for ( Block & block : bucket_to_blocks [ bucket ] )
{
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
# define M(NAME) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
2018-08-27 18:05:28 +00:00
mergeStreamsImpl ( block , aggregates_pool , * result . NAME , result . NAME - > data . impls [ bucket ] , nullptr , false ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
} ;
std : : unique_ptr < ThreadPool > thread_pool ;
2018-06-04 14:17:24 +00:00
if ( max_threads > 1 & & total_input_rows > 100000 ) /// TODO Make a custom threshold.
2017-04-01 07:20:54 +00:00
thread_pool = std : : make_unique < ThreadPool > ( max_threads ) ;
for ( const auto & bucket_blocks : bucket_to_blocks )
{
const auto bucket = bucket_blocks . first ;
if ( bucket = = - 1 )
continue ;
result . aggregates_pools . push_back ( std : : make_shared < Arena > ( ) ) ;
Arena * aggregates_pool = result . aggregates_pools . back ( ) . get ( ) ;
2018-06-19 20:30:35 +00:00
auto task = std : : bind ( merge_bucket , bucket , aggregates_pool , CurrentThread : : getGroup ( ) ) ;
2017-04-01 07:20:54 +00:00
if ( thread_pool )
2019-10-17 14:41:27 +00:00
thread_pool - > scheduleOrThrowOnError ( task ) ;
2017-04-01 07:20:54 +00:00
else
task ( ) ;
}
2013-05-06 11:45:28 +00:00
2017-04-01 07:20:54 +00:00
if ( thread_pool )
thread_pool - > wait ( ) ;
2014-12-30 10:16:23 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Merged partially aggregated two-level data. " ) ;
}
if ( isCancelled ( ) )
{
result . invalidate ( ) ;
return ;
}
if ( has_blocks_with_unknown_bucket )
{
LOG_TRACE ( log , " Merging partially aggregated single-level data. " ) ;
bool no_more_keys = false ;
BlocksList & blocks = bucket_to_blocks [ - 1 ] ;
for ( Block & block : blocks )
{
if ( isCancelled ( ) )
{
result . invalidate ( ) ;
return ;
}
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
if ( ! checkLimits ( result . sizeWithoutOverflowRow ( ) , no_more_keys ) )
break ;
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | block . info . is_overflows )
mergeWithoutKeyStreamsImpl ( block , result ) ;
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
2018-08-27 18:05:28 +00:00
mergeStreamsImpl ( block , result . aggregates_pool , * result . NAME , result . NAME - > data , result . without_key , no_more_keys ) ;
2017-04-01 07:20:54 +00:00
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
LOG_TRACE ( log , " Merged partially aggregated single-level data. " ) ;
}
2015-01-03 05:39:21 +00:00
}
2015-07-30 23:41:02 +00:00
Block Aggregator : : mergeBlocks ( BlocksList & blocks , bool final )
{
2017-04-01 07:20:54 +00:00
if ( blocks . empty ( ) )
return { } ;
2015-07-30 23:41:02 +00:00
2017-07-25 12:16:14 +00:00
auto bucket_num = blocks . front ( ) . info . bucket_num ;
bool is_overflows = blocks . front ( ) . info . is_overflows ;
LOG_TRACE ( log , " Merging partially aggregated blocks (bucket = " < < bucket_num < < " ). " ) ;
Stopwatch watch ;
2017-04-01 07:20:54 +00:00
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
* Better hash function is needed because during external aggregation ,
* we may merge partitions of data with total number of keys far greater than 4 billion .
*/
2018-08-27 17:42:13 +00:00
auto merge_method = method_chosen ;
2016-09-23 05:49:55 +00:00
# define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
2018-01-09 00:19:58 +00:00
M ( key64 ) \
M ( key_string ) \
2017-04-01 07:20:54 +00:00
M ( key_fixed_string ) \
2018-01-09 00:19:58 +00:00
M ( keys128 ) \
M ( keys256 ) \
M ( serialized ) \
2016-09-23 05:49:55 +00:00
# define M(NAME) \
2018-02-24 04:46:14 +00:00
if ( merge_method = = AggregatedDataVariants : : Type : : NAME ) \
merge_method = AggregatedDataVariants : : Type : : NAME # # _hash64 ; \
2016-09-23 05:49:55 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION ( M )
2016-09-23 05:49:55 +00:00
# undef M
# undef APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION
2017-04-02 17:37:49 +00:00
/// Temporary data for aggregation.
2017-04-01 07:20:54 +00:00
AggregatedDataVariants result ;
2017-04-02 17:37:49 +00:00
/// result will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
2018-02-24 04:46:14 +00:00
result . init ( merge_method ) ;
2017-04-01 07:20:54 +00:00
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
for ( Block & block : blocks )
{
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
return { } ;
2017-07-25 17:56:09 +00:00
if ( bucket_num > = 0 & & block . info . bucket_num ! = bucket_num )
bucket_num = - 1 ;
2017-07-25 12:16:14 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | is_overflows )
2017-04-01 07:20:54 +00:00
mergeWithoutKeyStreamsImpl ( block , result ) ;
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
2018-08-27 18:05:28 +00:00
mergeStreamsImpl ( block , result . aggregates_pool , * result . NAME , result . NAME - > data , nullptr , false ) ;
2017-04-01 07:20:54 +00:00
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
return { } ;
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
Block block ;
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | is_overflows )
block = prepareBlockAndFillWithoutKey ( result , final , is_overflows ) ;
else
block = prepareBlockAndFillSingleLevel ( result , final ) ;
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
if ( ! final )
{
/// Pass ownership of aggregate function states from result to ColumnAggregateFunction objects in the resulting block.
result . aggregator = nullptr ;
2017-04-01 07:20:54 +00:00
}
2017-07-25 12:16:14 +00:00
size_t rows = block . rows ( ) ;
size_t bytes = block . bytes ( ) ;
double elapsed_seconds = watch . elapsedSeconds ( ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Merged partially aggregated blocks. "
< < rows < < " rows, " < < bytes / 1048576.0 < < " MiB. "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < rows / elapsed_seconds < < " rows/sec., " < < bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
2017-04-01 07:20:54 +00:00
return { } ;
2017-07-25 12:16:14 +00:00
block . info . bucket_num = bucket_num ;
return block ;
2015-07-30 23:41:02 +00:00
}
2015-09-07 07:40:14 +00:00
template < typename Method >
void NO_INLINE Aggregator : : convertBlockToTwoLevelImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Arena * pool ,
2017-12-13 01:27:53 +00:00
ColumnRawPtrs & key_columns ,
2017-04-01 07:20:54 +00:00
const Block & source ,
std : : vector < Block > & destinations ) const
2015-09-07 07:40:14 +00:00
{
2019-01-21 10:39:24 +00:00
typename Method : : State state ( key_columns , key_sizes , aggregation_state_cache ) ;
2017-04-01 07:20:54 +00:00
size_t rows = source . rows ( ) ;
size_t columns = source . columns ( ) ;
/// Create a 'selector' that will contain bucket index for every row. It will be used to scatter rows to buckets.
IColumn : : Selector selector ( rows ) ;
/// For every row.
for ( size_t i = 0 ; i < rows ; + + i )
{
2018-12-03 13:00:01 +00:00
if constexpr ( Method : : low_cardinality_optimization )
{
if ( state . isNullAt ( i ) )
{
selector [ i ] = 0 ;
continue ;
}
}
2019-01-21 10:39:24 +00:00
/// Calculate bucket number from row hash.
auto hash = state . getHash ( method . data , i , * pool ) ;
2017-04-01 07:20:54 +00:00
auto bucket = method . data . getBucketFromHash ( hash ) ;
selector [ i ] = bucket ;
}
size_t num_buckets = destinations . size ( ) ;
for ( size_t column_idx = 0 ; column_idx < columns ; + + column_idx )
{
const ColumnWithTypeAndName & src_col = source . getByPosition ( column_idx ) ;
2017-12-15 03:47:43 +00:00
MutableColumns scattered_columns = src_col . column - > scatter ( num_buckets , selector ) ;
2017-04-01 07:20:54 +00:00
for ( size_t bucket = 0 , size = num_buckets ; bucket < size ; + + bucket )
{
if ( ! scattered_columns [ bucket ] - > empty ( ) )
{
Block & dst = destinations [ bucket ] ;
dst . info . bucket_num = bucket ;
2017-12-15 03:47:43 +00:00
dst . insert ( { std : : move ( scattered_columns [ bucket ] ) , src_col . type , src_col . name } ) ;
2017-04-01 07:20:54 +00:00
}
/** Inserted columns of type ColumnAggregateFunction will own states of aggregate functions
* by holding shared_ptr to source column . See ColumnAggregateFunction . h
*/
}
}
2015-09-07 07:40:14 +00:00
}
2018-09-12 13:27:00 +00:00
std : : vector < Block > Aggregator : : convertBlockToTwoLevel ( const Block & block )
2015-09-07 07:40:14 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! block )
return { } ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
AggregatedDataVariants data ;
2015-09-07 07:40:14 +00:00
2017-12-13 01:27:53 +00:00
ColumnRawPtrs key_columns ( params . keys_size ) ;
2015-09-07 07:40:14 +00:00
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = block . safeGetByPosition ( i ) . column . get ( ) ;
2015-09-07 07:40:14 +00:00
2018-08-27 17:42:13 +00:00
AggregatedDataVariants : : Type type = method_chosen ;
2017-04-01 07:20:54 +00:00
data . keys_size = params . keys_size ;
data . key_sizes = key_sizes ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( type = = AggregatedDataVariants : : Type : : NAME ) \
type = AggregatedDataVariants : : Type : : NAME # # _two_level ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
data . init ( type ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
size_t num_buckets = 0 ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data . type = = AggregatedDataVariants : : Type : : NAME ) \
num_buckets = data . NAME - > data . NUM_BUCKETS ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
std : : vector < Block > splitted_blocks ( num_buckets ) ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data . type = = AggregatedDataVariants : : Type : : NAME ) \
2018-09-12 13:27:00 +00:00
convertBlockToTwoLevelImpl ( * data . NAME , data . aggregates_pool , \
2019-08-11 21:45:18 +00:00
key_columns , block , splitted_blocks ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
return splitted_blocks ;
2015-09-07 07:40:14 +00:00
}
2015-12-06 19:42:28 +00:00
template < typename Method , typename Table >
2017-12-01 20:21:35 +00:00
void NO_INLINE Aggregator : : destroyImpl ( Table & table ) const
2015-01-03 05:39:21 +00:00
{
2019-08-08 16:16:19 +00:00
table . forEachMapped ( [ & ] ( AggregateDataPtr & data )
2017-04-01 07:20:54 +00:00
{
2017-04-02 17:37:49 +00:00
/** If an exception (usually a lack of memory, the MemoryTracker throws) arose
* after inserting the key into a hash table , but before creating all states of aggregate functions ,
* then data will be equal nullptr .
2017-04-01 07:20:54 +00:00
*/
if ( nullptr = = data )
2019-08-08 16:16:19 +00:00
return ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
if ( ! aggregate_functions [ i ] - > isState ( ) )
aggregate_functions [ i ] - > destroy ( data + offsets_of_aggregate_states [ i ] ) ;
data = nullptr ;
2019-08-08 16:16:19 +00:00
} ) ;
2015-12-06 19:42:28 +00:00
}
void Aggregator : : destroyWithoutKey ( AggregatedDataVariants & result ) const
{
2017-04-01 07:20:54 +00:00
AggregatedDataWithoutKey & res_data = result . without_key ;
2015-12-06 19:42:28 +00:00
2017-04-01 07:20:54 +00:00
if ( nullptr ! = res_data )
{
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
if ( ! aggregate_functions [ i ] - > isState ( ) )
aggregate_functions [ i ] - > destroy ( res_data + offsets_of_aggregate_states [ i ] ) ;
2015-12-06 19:42:28 +00:00
2017-04-01 07:20:54 +00:00
res_data = nullptr ;
}
2012-05-30 01:38:02 +00:00
}
2014-05-19 19:41:56 +00:00
void Aggregator : : destroyAllAggregateStates ( AggregatedDataVariants & result )
2013-02-16 18:59:05 +00:00
{
2017-04-01 07:20:54 +00:00
if ( result . size ( ) = = 0 )
return ;
2013-02-16 18:59:05 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Destroying aggregate states " ) ;
2013-02-16 18:59:05 +00:00
2017-04-02 17:37:49 +00:00
/// In what data structure is the data aggregated?
2017-04-01 07:20:54 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | params . overflow_row )
destroyWithoutKey ( result ) ;
2013-02-16 18:59:05 +00:00
2014-12-30 10:16:23 +00:00
# define M(NAME, IS_TWO_LEVEL) \
2017-04-01 07:20:54 +00:00
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
2017-12-01 20:21:35 +00:00
destroyImpl < decltype ( result . NAME ) : : element_type > ( result . NAME - > data ) ;
2014-12-30 10:16:23 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_AGGREGATED_VARIANTS ( M )
2014-12-30 10:16:23 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2013-02-16 18:59:05 +00:00
}
2013-05-03 10:20:53 +00:00
2015-04-16 14:27:56 +00:00
void Aggregator : : setCancellationHook ( const CancellationHook cancellation_hook )
{
2017-04-01 07:20:54 +00:00
isCancelled = cancellation_hook ;
2015-04-16 14:27:56 +00:00
}
2015-07-30 23:41:02 +00:00
2011-09-19 01:42:16 +00:00
}