2013-02-03 18:39:09 +00:00
# include <iomanip>
2014-12-30 12:58:02 +00:00
# include <thread>
# include <future>
2013-02-03 18:39:09 +00:00
2015-01-12 08:06:17 +00:00
# include <cxxabi.h>
2017-04-01 09:19:00 +00:00
# include <Common/Stopwatch.h>
# include <Common/setThreadName.h>
2013-02-03 18:39:09 +00:00
2017-04-01 09:19:00 +00:00
# include <DataTypes/DataTypeAggregateFunction.h>
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnTuple.h>
# include <AggregateFunctions/AggregateFunctionCount.h>
# include <DataStreams/IProfilingBlockInputStream.h>
# include <DataStreams/NativeBlockOutputStream.h>
# include <DataStreams/NullBlockInputStream.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/CompressedWriteBuffer.h>
2011-09-19 03:34:23 +00:00
2017-04-01 09:19:00 +00:00
# include <Interpreters/Aggregator.h>
# include <Common/ClickHouseRevision.h>
2017-04-08 01:32:05 +00:00
# include <Common/MemoryTracker.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2017-04-01 09:19:00 +00:00
# include <Interpreters/config_compile.h>
2011-09-19 01:42:16 +00:00
2016-10-24 02:02:37 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event ExternalAggregationWritePart ;
extern const Event ExternalAggregationCompressedBytes ;
extern const Event ExternalAggregationUncompressedBytes ;
2016-10-24 02:02:37 +00:00
}
2016-10-24 04:06:27 +00:00
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric QueryThread ;
2016-10-24 04:06:27 +00:00
}
2011-09-19 01:42:16 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int CANNOT_COMPILE_CODE ;
extern const int TOO_MUCH_ROWS ;
extern const int EMPTY_DATA_PASSED ;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS ;
2016-01-11 21:46:36 +00:00
}
2011-09-26 07:25:22 +00:00
2013-02-16 18:59:05 +00:00
AggregatedDataVariants : : ~ AggregatedDataVariants ( )
{
2017-04-01 07:20:54 +00:00
if ( aggregator & & ! aggregator - > all_aggregates_has_trivial_destructor )
{
try
{
aggregator - > destroyAllAggregateStates ( * this ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
2013-02-16 18:59:05 +00:00
}
2014-12-30 12:58:02 +00:00
void AggregatedDataVariants : : convertToTwoLevel ( )
{
2017-04-01 07:20:54 +00:00
if ( aggregator )
LOG_TRACE ( aggregator - > log , " Converting aggregation data to two-level. " ) ;
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
switch ( type )
{
# define M(NAME) \
case Type : : NAME : \
NAME # # _two_level = std : : make_unique < decltype ( NAME # # _two_level ) : : element_type > ( * NAME ) ; \
NAME . reset ( ) ; \
type = Type : : NAME # # _two_level ; \
break ;
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
# undef M
2014-12-30 12:58:02 +00:00
2017-04-01 07:20:54 +00:00
default :
throw Exception ( " Wrong data variant passed. " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2014-12-30 12:58:02 +00:00
}
2015-11-30 16:57:05 +00:00
void Aggregator : : Params : : calculateColumnNumbers ( const Block & block )
{
2017-04-01 07:20:54 +00:00
if ( keys . empty ( ) & & ! key_names . empty ( ) )
for ( Names : : const_iterator it = key_names . begin ( ) ; it ! = key_names . end ( ) ; + + it )
keys . push_back ( block . getPositionByName ( * it ) ) ;
for ( AggregateDescriptions : : iterator it = aggregates . begin ( ) ; it ! = aggregates . end ( ) ; + + it )
if ( it - > arguments . empty ( ) & & ! it - > argument_names . empty ( ) )
for ( Names : : const_iterator jt = it - > argument_names . begin ( ) ; jt ! = it - > argument_names . end ( ) ; + + jt )
it - > arguments . push_back ( block . getPositionByName ( * jt ) ) ;
2015-11-30 16:57:05 +00:00
}
2015-09-07 07:40:14 +00:00
void Aggregator : : initialize ( const Block & block )
2012-03-05 07:58:34 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
std : : lock_guard < std : : mutex > lock ( mutex ) ;
if ( initialized )
return ;
initialized = true ;
if ( current_memory_tracker )
memory_usage_before_aggregation = current_memory_tracker - > get ( ) ;
aggregate_functions . resize ( params . aggregates_size ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] = params . aggregates [ i ] . function . get ( ) ;
/// Initialize sizes of aggregation states and its offsets.
offsets_of_aggregate_states . resize ( params . aggregates_size ) ;
total_size_of_aggregate_states = 0 ;
all_aggregates_has_trivial_destructor = true ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
offsets_of_aggregate_states [ i ] = total_size_of_aggregate_states ;
total_size_of_aggregate_states + = params . aggregates [ i ] . function - > sizeOfData ( ) ;
if ( ! params . aggregates [ i ] . function - > hasTrivialDestructor ( ) )
all_aggregates_has_trivial_destructor = false ;
}
if ( isCancelled ( ) )
return ;
/** All below, if non-empty block passed.
* ( it doesn ' t needed in methods that merging blocks with aggregation states ) .
*/
if ( ! block )
return ;
/// Transform names of columns to numbers.
params . calculateColumnNumbers ( block ) ;
if ( isCancelled ( ) )
return ;
/// Create "header" block, describing result.
if ( ! sample )
{
for ( size_t i = 0 ; i < params . keys_size ; + + i )
{
sample . insert ( block . safeGetByPosition ( params . keys [ i ] ) . cloneEmpty ( ) ) ;
if ( auto converted = sample . safeGetByPosition ( i ) . column - > convertToFullColumnIfConst ( ) )
sample . safeGetByPosition ( i ) . column = converted ;
}
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
ColumnWithTypeAndName col ;
col . name = params . aggregates [ i ] . column_name ;
size_t arguments_size = params . aggregates [ i ] . arguments . size ( ) ;
DataTypes argument_types ( arguments_size ) ;
for ( size_t j = 0 ; j < arguments_size ; + + j )
argument_types [ j ] = block . safeGetByPosition ( params . aggregates [ i ] . arguments [ j ] ) . type ;
col . type = std : : make_shared < DataTypeAggregateFunction > ( params . aggregates [ i ] . function , argument_types , params . aggregates [ i ] . parameters ) ;
col . column = col . type - > createColumn ( ) ;
sample . insert ( std : : move ( col ) ) ;
}
}
2012-03-05 07:58:34 +00:00
}
2015-12-09 02:56:18 +00:00
void Aggregator : : setSampleBlock ( const Block & block )
{
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( mutex ) ;
2015-12-09 02:56:18 +00:00
2017-04-01 07:20:54 +00:00
if ( ! sample )
sample = block . cloneEmpty ( ) ;
2015-12-09 02:56:18 +00:00
}
2015-01-10 02:30:03 +00:00
void Aggregator : : compileIfPossible ( AggregatedDataVariants : : Type type )
{
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( mutex ) ;
2015-01-10 02:30:03 +00:00
2017-04-01 07:20:54 +00:00
if ( compiled_if_possible )
return ;
2015-01-10 02:30:03 +00:00
2017-04-01 07:20:54 +00:00
compiled_if_possible = true ;
2015-01-10 02:30:03 +00:00
2017-04-01 07:20:54 +00:00
std : : string method_typename ;
std : : string method_typename_two_level ;
2015-01-10 02:30:03 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
2015-01-13 01:57:22 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( type = = AggregatedDataVariants : : Type : : NAME ) \
{ \
method_typename = " decltype(AggregatedDataVariants:: " # NAME " )::element_type " ; \
method_typename_two_level = " decltype(AggregatedDataVariants:: " # NAME " _two_level)::element_type " ; \
}
2015-01-13 01:57:22 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2015-01-13 01:57:22 +00:00
# undef M
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( type = = AggregatedDataVariants : : Type : : NAME ) \
method_typename = " decltype(AggregatedDataVariants:: " # NAME " )::element_type " ;
2015-01-10 02:30:03 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL ( M )
2015-01-10 02:30:03 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else if ( type = = AggregatedDataVariants : : Type : : without_key ) { }
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2017-04-02 17:37:49 +00:00
/// List of types of aggregate functions.
2017-04-01 07:20:54 +00:00
std : : stringstream aggregate_functions_typenames_str ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
IAggregateFunction & func = * aggregate_functions [ i ] ;
int status = 0 ;
char * type_name_ptr = abi : : __cxa_demangle ( typeid ( func ) . name ( ) , 0 , 0 , & status ) ;
std : : string type_name = type_name_ptr ;
free ( type_name_ptr ) ;
if ( status )
throw Exception ( " Cannot compile code: cannot demangle name " + String ( typeid ( func ) . name ( ) )
+ " , status: " + toString ( status ) , ErrorCodes : : CANNOT_COMPILE_CODE ) ;
aggregate_functions_typenames_str < < ( ( i ! = 0 ) ? " , " : " " ) < < type_name ;
}
std : : string aggregate_functions_typenames = aggregate_functions_typenames_str . str ( ) ;
std : : stringstream key_str ;
key_str < < " Aggregate: " ;
if ( ! method_typename . empty ( ) )
key_str < < method_typename + " , " ;
key_str < < aggregate_functions_typenames ;
std : : string key = key_str . str ( ) ;
auto get_code = [ method_typename , method_typename_two_level , aggregate_functions_typenames ]
{
2017-04-02 17:37:49 +00:00
/// A short piece of code, which is an explicit instantiation of the template.
2017-04-01 07:20:54 +00:00
std : : stringstream code ;
2017-04-02 17:37:49 +00:00
code < < /// No explicit inclusion of the header file. It is included using the -include compiler option.
2017-04-01 07:20:54 +00:00
" namespace DB \n "
" { \n "
" \n " ;
2017-04-02 17:37:49 +00:00
/// There can be up to two instantiations for the template - for normal and two_level options.
2017-04-01 07:20:54 +00:00
auto append_code_for_specialization =
[ & code , & aggregate_functions_typenames ] ( const std : : string & method_typename , const std : : string & suffix )
{
code < <
" template void Aggregator::executeSpecialized< \n "
" \t " < < method_typename < < " , TypeList< " < < aggregate_functions_typenames < < " >>( \n "
" \t " < < method_typename < < " &, Arena *, size_t, ConstColumnPlainPtrs &, \n "
" \t AggregateColumns &, const Sizes &, StringRefs &, bool, AggregateDataPtr) const; \n "
" \n "
" static void wrapper " < < suffix < < " ( \n "
" \t const Aggregator & aggregator, \n "
" \t " < < method_typename < < " & method, \n "
" \t Arena * arena, \n "
" \t size_t rows, \n "
" \t ConstColumnPlainPtrs & key_columns, \n "
" \t Aggregator::AggregateColumns & aggregate_columns, \n "
" \t const Sizes & key_sizes, \n "
" \t StringRefs & keys, \n "
" \t bool no_more_keys, \n "
" \t AggregateDataPtr overflow_row) \n "
" { \n "
" \t aggregator.executeSpecialized< \n "
" \t \t " < < method_typename < < " , TypeList< " < < aggregate_functions_typenames < < " >>( \n "
" \t \t method, arena, rows, key_columns, aggregate_columns, key_sizes, keys, no_more_keys, overflow_row); \n "
" } \n "
" \n "
" void * getPtr " < < suffix < < " () __attribute__((__visibility__( \" default \" ))); \n "
2017-04-02 17:37:49 +00:00
" void * getPtr " < < suffix < < " () \n " /// Without this wrapper, it's not clear how to get the desired symbol from the compiled library.
2017-04-01 07:20:54 +00:00
" { \n "
" \t return reinterpret_cast<void *>(&wrapper " < < suffix < < " ); \n "
" } \n " ;
} ;
if ( ! method_typename . empty ( ) )
append_code_for_specialization ( method_typename , " " ) ;
else
{
2017-04-02 17:37:49 +00:00
/// For `without_key` method.
2017-04-01 07:20:54 +00:00
code < <
" template void Aggregator::executeSpecializedWithoutKey< \n "
" \t " < < " TypeList< " < < aggregate_functions_typenames < < " >>( \n "
" \t AggregatedDataWithoutKey &, size_t, AggregateColumns &, Arena *) const; \n "
" \n "
" static void wrapper( \n "
" \t const Aggregator & aggregator, \n "
" \t AggregatedDataWithoutKey & method, \n "
" \t size_t rows, \n "
" \t Aggregator::AggregateColumns & aggregate_columns, \n "
" \t Arena * arena) \n "
" { \n "
" \t aggregator.executeSpecializedWithoutKey< \n "
" \t \t TypeList< " < < aggregate_functions_typenames < < " >>( \n "
" \t \t method, rows, aggregate_columns, arena); \n "
" } \n "
" \n "
" void * getPtr() __attribute__((__visibility__( \" default \" ))); \n "
" void * getPtr() \n "
" { \n "
" \t return reinterpret_cast<void *>(&wrapper); \n "
" } \n " ;
}
if ( ! method_typename_two_level . empty ( ) )
append_code_for_specialization ( method_typename_two_level , " TwoLevel " ) ;
else
{
2017-04-02 17:37:49 +00:00
/// The stub.
2017-04-01 07:20:54 +00:00
code < <
" void * getPtrTwoLevel() __attribute__((__visibility__( \" default \" ))); \n "
" void * getPtrTwoLevel() \n "
" { \n "
" \t return nullptr; \n "
" } \n " ;
}
code < <
" } \n " ;
return code . str ( ) ;
} ;
auto compiled_data_owned_by_callback = compiled_data ;
auto on_ready = [ compiled_data_owned_by_callback ] ( SharedLibraryPtr & lib )
{
2017-04-02 17:37:49 +00:00
if ( compiled_data_owned_by_callback . unique ( ) ) /// Aggregator is already destroyed.
2017-04-01 07:20:54 +00:00
return ;
compiled_data_owned_by_callback - > compiled_aggregator = lib ;
compiled_data_owned_by_callback - > compiled_method_ptr = lib - > get < void * ( * ) ( ) > ( " _ZN2DB6getPtrEv " ) ( ) ;
compiled_data_owned_by_callback - > compiled_two_level_method_ptr = lib - > get < void * ( * ) ( ) > ( " _ZN2DB14getPtrTwoLevelEv " ) ( ) ;
} ;
2017-04-02 17:37:49 +00:00
/** If the library has already been compiled, a non-zero SharedLibraryPtr is returned.
* If the library was not compiled , then the counter is incremented , and nullptr is returned .
* If the counter has reached the value min_count_to_compile , then the compilation starts asynchronously ( in a separate thread )
* at the end of which ` on_ready ` callback is called .
2017-04-01 07:20:54 +00:00
*/
SharedLibraryPtr lib = params . compiler - > getOrCount ( key , params . min_count_to_compile ,
2017-08-23 13:32:30 +00:00
" -include " INTERNAL_COMPILER_HEADERS " /dbms/src/Interpreters/SpecializedAggregator.h "
" -Wno-unused-function " ,
2017-04-01 07:20:54 +00:00
get_code , on_ready ) ;
2017-04-02 17:37:49 +00:00
/// If the result is already ready.
2017-04-01 07:20:54 +00:00
if ( lib )
on_ready ( lib ) ;
2015-01-10 02:30:03 +00:00
}
2016-09-23 05:49:55 +00:00
AggregatedDataVariants : : Type Aggregator : : chooseAggregationMethod ( const ConstColumnPlainPtrs & key_columns , Sizes & key_sizes ) const
2012-05-30 01:38:02 +00:00
{
2017-04-01 07:20:54 +00:00
/// Check if at least one of the specified keys is nullable.
/// Create a set of nested key columns from the corresponding key columns.
/// Here "nested" means that, if a key column is nullable, we take its nested
/// column; otherwise we take the key column as is.
ConstColumnPlainPtrs nested_key_columns ;
nested_key_columns . reserve ( key_columns . size ( ) ) ;
bool has_nullable_key = false ;
for ( const auto & col : key_columns )
{
if ( col - > isNullable ( ) )
{
const ColumnNullable & nullable_col = static_cast < const ColumnNullable & > ( * col ) ;
nested_key_columns . push_back ( nullable_col . getNestedColumn ( ) . get ( ) ) ;
has_nullable_key = true ;
}
else
nested_key_columns . push_back ( col ) ;
}
/** Returns ordinary (not two-level) methods, because we start from them.
* Later , during aggregation process , data may be converted ( partitioned ) to two - level structure , if cardinality is high .
*/
bool all_fixed = true ;
size_t keys_bytes = 0 ;
size_t num_array_keys = 0 ;
bool has_arrays_of_non_fixed_elems = false ;
bool all_non_array_keys_are_fixed = true ;
bool has_tuples = false ;
bool has_arrays_of_nullable = false ;
key_sizes . resize ( params . keys_size ) ;
for ( size_t j = 0 ; j < params . keys_size ; + + j )
{
if ( nested_key_columns [ j ] - > isFixed ( ) )
{
key_sizes [ j ] = nested_key_columns [ j ] - > sizeOfField ( ) ;
keys_bytes + = key_sizes [ j ] ;
}
else
{
all_fixed = false ;
if ( const ColumnArray * arr = typeid_cast < const ColumnArray * > ( nested_key_columns [ j ] ) )
{
+ + num_array_keys ;
if ( arr - > getData ( ) . isNullable ( ) )
has_arrays_of_nullable = true ;
if ( ! arr - > getData ( ) . isFixed ( ) )
has_arrays_of_non_fixed_elems = true ;
}
else
{
all_non_array_keys_are_fixed = false ;
if ( typeid_cast < const ColumnTuple * > ( nested_key_columns [ j ] ) )
has_tuples = true ;
}
}
}
/// If no keys. All aggregating to single row.
if ( params . keys_size = = 0 )
return AggregatedDataVariants : : Type : : without_key ;
if ( has_nullable_key | | has_arrays_of_nullable )
{
/// At least one key is nullable. Therefore we choose an aggregation method
/// that takes into account this fact.
if ( ( params . keys_size = = 1 ) & & ( nested_key_columns [ 0 ] - > isNumeric ( ) ) )
{
/// We have exactly one key and it is nullable. We shall add it a tag
/// which specifies whether its value is null or not.
size_t size_of_field = nested_key_columns [ 0 ] - > sizeOfField ( ) ;
2017-06-15 09:12:32 +00:00
if ( ( size_of_field = = 1 ) | | ( size_of_field = = 2 ) | | ( size_of_field = = 4 ) | | ( size_of_field = = 8 ) | | ( size_of_field = = 16 ) )
2017-04-01 07:20:54 +00:00
return AggregatedDataVariants : : Type : : nullable_keys128 ;
else
2017-06-15 09:12:32 +00:00
throw Exception { " Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16. " ,
2017-04-01 07:20:54 +00:00
ErrorCodes : : LOGICAL_ERROR } ;
}
if ( all_fixed )
{
/// Pack if possible all the keys along with information about which key values are nulls
/// into a fixed 16- or 32-byte blob.
if ( keys_bytes > ( std : : numeric_limits < size_t > : : max ( ) - std : : tuple_size < KeysNullMap < UInt128 > > : : value ) )
throw Exception { " Aggregator: keys sizes overflow " , ErrorCodes : : LOGICAL_ERROR } ;
if ( ( std : : tuple_size < KeysNullMap < UInt128 > > : : value + keys_bytes ) < = 16 )
return AggregatedDataVariants : : Type : : nullable_keys128 ;
if ( ( std : : tuple_size < KeysNullMap < UInt256 > > : : value + keys_bytes ) < = 32 )
return AggregatedDataVariants : : Type : : nullable_keys256 ;
}
/// Fallback case.
return AggregatedDataVariants : : Type : : serialized ;
}
/// No key has been found to be nullable.
/// Single numeric key.
if ( ( params . keys_size = = 1 ) & & nested_key_columns [ 0 ] - > isNumericNotNullable ( ) )
{
size_t size_of_field = nested_key_columns [ 0 ] - > sizeOfField ( ) ;
if ( size_of_field = = 1 )
return AggregatedDataVariants : : Type : : key8 ;
if ( size_of_field = = 2 )
return AggregatedDataVariants : : Type : : key16 ;
if ( size_of_field = = 4 )
return AggregatedDataVariants : : Type : : key32 ;
if ( size_of_field = = 8 )
return AggregatedDataVariants : : Type : : key64 ;
2017-06-15 09:12:32 +00:00
if ( size_of_field = = 16 )
return AggregatedDataVariants : : Type : : keys128 ;
throw Exception ( " Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
}
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
if ( all_fixed & & keys_bytes < = 16 )
return AggregatedDataVariants : : Type : : keys128 ;
if ( all_fixed & & keys_bytes < = 32 )
return AggregatedDataVariants : : Type : : keys256 ;
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
if ( params . keys_size = = 1 & & typeid_cast < const ColumnString * > ( nested_key_columns [ 0 ] ) )
return AggregatedDataVariants : : Type : : key_string ;
if ( params . keys_size = = 1 & & typeid_cast < const ColumnFixedString * > ( nested_key_columns [ 0 ] ) )
return AggregatedDataVariants : : Type : : key_fixed_string ;
/** If some keys are arrays.
* If there is no more than one key that is array , and it is array of fixed - size elements , and all other keys are fixed - size ,
* then it is possible to use ' concat ' method ( due to one - to - one correspondense ) . Otherwise the method will be ' serialized ' .
*/
if ( num_array_keys = = 1 & & ! has_arrays_of_non_fixed_elems & & all_non_array_keys_are_fixed )
return AggregatedDataVariants : : Type : : concat ;
/** For case with multiple strings, we use 'concat' method despite the fact, that correspondense is not one-to-one.
* Concat will concatenate strings including its zero terminators .
* But if strings contains zero bytes in between , different keys may clash .
* For example , keys ( ' a \ 0 b ' , ' c ' ) and ( ' a ' , ' b \ 0 c ' ) will be aggregated as one key .
* This is documented behaviour . It may be avoided by just switching to ' serialized ' method , which is less efficient .
*
* Some of aggregation keys may be tuples . In most cases , tuples are flattened in expression analyzer and not passed here .
* But in rare cases , they are not flattened . Will fallback to ' serialized ' method for simplicity .
*/
if ( num_array_keys = = 0 & & ! has_tuples )
return AggregatedDataVariants : : Type : : concat ;
return AggregatedDataVariants : : Type : : serialized ;
/// NOTE AggregatedDataVariants::Type::hashed is not used. It's proven to be less efficient than 'serialized' in most cases.
2012-05-30 01:38:02 +00:00
}
2014-05-19 19:41:56 +00:00
void Aggregator : : createAggregateStates ( AggregateDataPtr & aggregate_data ) const
{
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < params . aggregates_size ; + + j )
{
try
{
2017-04-02 17:37:49 +00:00
/** An exception may occur if there is a shortage of memory.
* In order that then everything is properly destroyed , we " roll back " some of the created states .
* The code is not very convenient .
2017-04-01 07:20:54 +00:00
*/
aggregate_functions [ j ] - > create ( aggregate_data + offsets_of_aggregate_states [ j ] ) ;
}
catch ( . . . )
{
for ( size_t rollback_j = 0 ; rollback_j < j ; + + rollback_j )
aggregate_functions [ rollback_j ] - > destroy ( aggregate_data + offsets_of_aggregate_states [ rollback_j ] ) ;
throw ;
}
}
2014-05-19 19:41:56 +00:00
}
2017-04-02 17:37:49 +00:00
/** It's interesting - if you remove `noinline`, then gcc for some reason will inline this function, and the performance decreases (~ 10%).
* ( Probably because after the inline of this function , more internal functions no longer be inlined . )
* Inline does not make sense , since the inner loop is entirely inside this function .
2014-10-29 01:18:50 +00:00
*/
2014-05-10 00:31:22 +00:00
template < typename Method >
2014-10-29 01:18:50 +00:00
void NO_INLINE Aggregator : : executeImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Arena * aggregates_pool ,
size_t rows ,
ConstColumnPlainPtrs & key_columns ,
AggregateFunctionInstruction * aggregate_instructions ,
const Sizes & key_sizes ,
StringRefs & keys ,
bool no_more_keys ,
AggregateDataPtr overflow_row ) const
2014-05-10 00:31:22 +00:00
{
2017-04-01 07:20:54 +00:00
typename Method : : State state ;
state . init ( key_columns ) ;
2014-05-10 00:31:22 +00:00
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
executeImplCase < false > ( method , state , aggregates_pool , rows , key_columns , aggregate_instructions , key_sizes , keys , overflow_row ) ;
else
executeImplCase < true > ( method , state , aggregates_pool , rows , key_columns , aggregate_instructions , key_sizes , keys , overflow_row ) ;
2014-12-30 10:16:23 +00:00
}
2015-01-21 04:23:22 +00:00
# ifndef __clang__
2014-12-30 12:58:02 +00:00
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
2015-01-21 04:23:22 +00:00
# endif
2014-12-30 10:16:23 +00:00
template < bool no_more_keys , typename Method >
void NO_INLINE Aggregator : : executeImplCase (
2017-04-01 07:20:54 +00:00
Method & method ,
typename Method : : State & state ,
Arena * aggregates_pool ,
size_t rows ,
ConstColumnPlainPtrs & key_columns ,
AggregateFunctionInstruction * aggregate_instructions ,
const Sizes & key_sizes ,
StringRefs & keys ,
AggregateDataPtr overflow_row ) const
2014-12-30 10:16:23 +00:00
{
2017-04-02 17:37:49 +00:00
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// For all rows.
2017-04-01 07:20:54 +00:00
typename Method : : iterator it ;
typename Method : : Key prev_key ;
for ( size_t i = 0 ; i < rows ; + + i )
{
2017-04-02 17:37:49 +00:00
bool inserted ; /// Inserted a new key, or was this key already?
bool overflow = false ; /// The new key did not fit in the hash table because of no_more_keys.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Get the key to insert into the hash table.
2017-04-01 07:20:54 +00:00
typename Method : : Key key = state . getKey ( key_columns , params . keys_size , i , key_sizes , keys , * aggregates_pool ) ;
2017-04-02 17:37:49 +00:00
if ( ! no_more_keys ) /// Insert.
2017-04-01 07:20:54 +00:00
{
2017-08-24 13:25:51 +00:00
/// Optimization for consecutive identical keys.
2017-04-01 07:20:54 +00:00
if ( ! Method : : no_consecutive_keys_optimization )
{
if ( i ! = 0 & & key = = prev_key )
{
2017-04-02 17:37:49 +00:00
/// Add values to the aggregate functions.
2017-04-01 07:20:54 +00:00
AggregateDataPtr value = Method : : getAggregateData ( it - > second ) ;
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
( * inst - > func ) ( inst - > that , value + inst - > state_offset , inst - > arguments , i , aggregates_pool ) ;
method . onExistingKey ( key , keys , * aggregates_pool ) ;
continue ;
}
else
prev_key = key ;
}
method . data . emplace ( key , it , inserted ) ;
}
else
{
2017-04-02 17:37:49 +00:00
/// Add only if the key already exists.
2017-04-01 07:20:54 +00:00
inserted = false ;
it = method . data . find ( key ) ;
if ( method . data . end ( ) = = it )
overflow = true ;
}
2017-04-02 17:37:49 +00:00
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
2017-04-01 07:20:54 +00:00
if ( no_more_keys & & overflow & & ! overflow_row )
{
method . onExistingKey ( key , keys , * aggregates_pool ) ;
continue ;
}
2017-04-02 17:37:49 +00:00
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
2017-04-01 07:20:54 +00:00
if ( inserted )
{
AggregateDataPtr & aggregate_data = Method : : getAggregateData ( it - > second ) ;
2017-04-02 17:37:49 +00:00
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
2017-04-01 07:20:54 +00:00
aggregate_data = nullptr ;
method . onNewKey ( * it , params . keys_size , i , keys , * aggregates_pool ) ;
AggregateDataPtr place = aggregates_pool - > alloc ( total_size_of_aggregate_states ) ;
createAggregateStates ( place ) ;
aggregate_data = place ;
}
else
method . onExistingKey ( key , keys , * aggregates_pool ) ;
AggregateDataPtr value = ( ! no_more_keys | | ! overflow ) ? Method : : getAggregateData ( it - > second ) : overflow_row ;
2017-04-02 17:37:49 +00:00
/// Add values to the aggregate functions.
2017-04-01 07:20:54 +00:00
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
( * inst - > func ) ( inst - > that , value + inst - > state_offset , inst - > arguments , i , aggregates_pool ) ;
}
2014-05-10 00:31:22 +00:00
}
2015-01-21 04:23:22 +00:00
# ifndef __clang__
2014-12-30 12:58:02 +00:00
# pragma GCC diagnostic pop
2015-01-21 04:23:22 +00:00
# endif
2014-12-30 11:27:58 +00:00
2015-01-13 03:03:45 +00:00
void NO_INLINE Aggregator : : executeWithoutKeyImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataWithoutKey & res ,
size_t rows ,
AggregateFunctionInstruction * aggregate_instructions ,
Arena * arena ) const
2015-01-13 03:03:45 +00:00
{
2017-04-02 17:37:49 +00:00
/// Optimization in the case of a single aggregate function `count`.
2017-04-01 07:20:54 +00:00
AggregateFunctionCount * agg_count = params . aggregates_size = = 1
? typeid_cast < AggregateFunctionCount * > ( aggregate_functions [ 0 ] )
: NULL ;
if ( agg_count )
agg_count - > addDelta ( res , rows ) ;
else
{
for ( size_t i = 0 ; i < rows ; + + i )
{
2017-04-02 17:37:49 +00:00
/// Adding values
2017-04-01 07:20:54 +00:00
for ( AggregateFunctionInstruction * inst = aggregate_instructions ; inst - > that ; + + inst )
( * inst - > func ) ( inst - > that , res + inst - > state_offset , inst - > arguments , i , arena ) ;
}
}
2015-01-13 03:03:45 +00:00
}
2014-05-10 05:16:23 +00:00
bool Aggregator : : executeOnBlock ( Block & block , AggregatedDataVariants & result ,
2017-04-01 07:20:54 +00:00
ConstColumnPlainPtrs & key_columns , AggregateColumns & aggregate_columns ,
Sizes & key_sizes , StringRefs & key ,
bool & no_more_keys )
2014-05-10 05:16:23 +00:00
{
2017-04-01 07:20:54 +00:00
initialize ( block ) ;
if ( isCancelled ( ) )
return true ;
2017-04-02 17:37:49 +00:00
/// `result` will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] . resize ( params . aggregates [ i ] . arguments . size ( ) ) ;
2017-04-02 17:37:49 +00:00
/** Constant columns are not supported directly during aggregation.
* To make them work anyway , we materialize them .
2017-04-01 07:20:54 +00:00
*/
Columns materialized_columns ;
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
{
key_columns [ i ] = block . safeGetByPosition ( params . keys [ i ] ) . column . get ( ) ;
if ( auto converted = key_columns [ i ] - > convertToFullColumnIfConst ( ) )
{
materialized_columns . push_back ( converted ) ;
key_columns [ i ] = materialized_columns . back ( ) . get ( ) ;
}
}
AggregateFunctionInstructions aggregate_functions_instructions ( params . aggregates_size + 1 ) ;
aggregate_functions_instructions [ params . aggregates_size ] . that = nullptr ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
for ( size_t j = 0 ; j < aggregate_columns [ i ] . size ( ) ; + + j )
{
aggregate_columns [ i ] [ j ] = block . safeGetByPosition ( params . aggregates [ i ] . arguments [ j ] ) . column . get ( ) ;
if ( auto converted = aggregate_columns [ i ] [ j ] - > convertToFullColumnIfConst ( ) )
{
materialized_columns . push_back ( converted ) ;
aggregate_columns [ i ] [ j ] = materialized_columns . back ( ) . get ( ) ;
}
}
aggregate_functions_instructions [ i ] . that = aggregate_functions [ i ] ;
aggregate_functions_instructions [ i ] . func = aggregate_functions [ i ] - > getAddressOfAddFunction ( ) ;
aggregate_functions_instructions [ i ] . state_offset = offsets_of_aggregate_states [ i ] ;
aggregate_functions_instructions [ i ] . arguments = aggregate_columns [ i ] . data ( ) ;
}
if ( isCancelled ( ) )
return true ;
size_t rows = block . rows ( ) ;
2017-04-02 17:37:49 +00:00
/// How to perform the aggregation?
2017-04-01 07:20:54 +00:00
if ( result . empty ( ) )
{
result . init ( chooseAggregationMethod ( key_columns , key_sizes ) ) ;
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
LOG_TRACE ( log , " Aggregation method: " < < result . getMethodName ( ) ) ;
if ( params . compiler )
compileIfPossible ( result . type ) ;
}
if ( isCancelled ( ) )
return true ;
if ( ( params . overflow_row | | result . type = = AggregatedDataVariants : : Type : : without_key ) & & ! result . without_key )
{
AggregateDataPtr place = result . aggregates_pool - > alloc ( total_size_of_aggregate_states ) ;
createAggregateStates ( place ) ;
result . without_key = place ;
}
2017-04-02 17:37:49 +00:00
/// We select one of the aggregation methods and call it.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// For the case when there are no keys (all aggregate into one row).
2017-04-01 07:20:54 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key )
{
2017-04-02 17:37:49 +00:00
/// If there is a dynamically compiled code.
2017-04-01 07:20:54 +00:00
if ( compiled_data - > compiled_method_ptr )
{
reinterpret_cast <
void ( * ) ( const Aggregator & , AggregatedDataWithoutKey & , size_t , AggregateColumns & , Arena * ) >
( compiled_data - > compiled_method_ptr ) ( * this , result . without_key , rows , aggregate_columns , result . aggregates_pool ) ;
}
else
executeWithoutKeyImpl ( result . without_key , rows , & aggregate_functions_instructions [ 0 ] , result . aggregates_pool ) ;
}
else
{
2017-04-02 17:37:49 +00:00
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
2017-04-01 07:20:54 +00:00
AggregateDataPtr overflow_row_ptr = params . overflow_row ? result . without_key : nullptr ;
bool is_two_level = result . isTwoLevel ( ) ;
2017-04-02 17:37:49 +00:00
/// Compiled code, for the normal structure.
2017-04-01 07:20:54 +00:00
if ( ! is_two_level & & compiled_data - > compiled_method_ptr )
{
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
reinterpret_cast < void ( * ) ( \
const Aggregator & , decltype ( result . NAME ) : : element_type & , \
Arena * , size_t , ConstColumnPlainPtrs & , AggregateColumns & , \
const Sizes & , StringRefs & , bool , AggregateDataPtr ) > ( compiled_data - > compiled_method_ptr ) \
( * this , * result . NAME , result . aggregates_pool , rows , key_columns , aggregate_columns , \
result . key_sizes , key , no_more_keys , overflow_row_ptr ) ;
if ( false ) { }
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
}
2017-04-02 17:37:49 +00:00
/// Compiled code, for a two-level structure.
2017-04-01 07:20:54 +00:00
else if ( is_two_level & & compiled_data - > compiled_two_level_method_ptr )
{
# define M(NAME) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
reinterpret_cast < void ( * ) ( \
const Aggregator & , decltype ( result . NAME ) : : element_type & , \
Arena * , size_t , ConstColumnPlainPtrs & , AggregateColumns & , \
const Sizes & , StringRefs & , bool , AggregateDataPtr ) > ( compiled_data - > compiled_two_level_method_ptr ) \
( * this , * result . NAME , result . aggregates_pool , rows , key_columns , aggregate_columns , \
result . key_sizes , key , no_more_keys , overflow_row_ptr ) ;
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
}
2017-04-02 17:37:49 +00:00
/// When there is no dynamically compiled code.
2017-04-01 07:20:54 +00:00
else
{
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
executeImpl ( * result . NAME , result . aggregates_pool , rows , key_columns , & aggregate_functions_instructions [ 0 ] , \
result . key_sizes , key , no_more_keys , overflow_row_ptr ) ;
if ( false ) { }
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
}
}
size_t result_size = result . sizeWithoutOverflowRow ( ) ;
Int64 current_memory_usage = 0 ;
if ( current_memory_tracker )
current_memory_usage = current_memory_tracker - > get ( ) ;
2017-04-02 17:37:49 +00:00
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation ; /// Here all the results in the sum are taken into account, from different threads.
2017-04-01 07:20:54 +00:00
bool worth_convert_to_two_level
= ( params . group_by_two_level_threshold & & result_size > = params . group_by_two_level_threshold )
| | ( params . group_by_two_level_threshold_bytes & & result_size_bytes > = static_cast < Int64 > ( params . group_by_two_level_threshold_bytes ) ) ;
2017-04-02 17:37:49 +00:00
/** Converting to a two-level data structure.
* It allows you to make , in the subsequent , an effective merge - either economical from memory or parallel .
2017-04-01 07:20:54 +00:00
*/
if ( result . isConvertibleToTwoLevel ( ) & & worth_convert_to_two_level )
result . convertToTwoLevel ( ) ;
2017-04-02 17:37:49 +00:00
/// Checking the constraints.
2017-04-01 07:20:54 +00:00
if ( ! checkLimits ( result_size , no_more_keys ) )
return false ;
2017-04-02 17:37:49 +00:00
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two - level aggregation structure is used .
2017-04-01 07:20:54 +00:00
*/
if ( params . max_bytes_before_external_group_by
& & result . isTwoLevel ( )
& & current_memory_usage > static_cast < Int64 > ( params . max_bytes_before_external_group_by )
& & worth_convert_to_two_level )
{
2017-07-25 13:09:52 +00:00
writeToTemporaryFile ( result ) ;
2017-04-01 07:20:54 +00:00
}
return true ;
2015-10-22 01:44:33 +00:00
}
2017-07-25 13:09:52 +00:00
void Aggregator : : writeToTemporaryFile ( AggregatedDataVariants & data_variants )
2015-11-30 19:57:46 +00:00
{
2017-04-01 07:20:54 +00:00
Stopwatch watch ;
2017-07-25 13:09:52 +00:00
size_t rows = data_variants . size ( ) ;
2015-12-01 14:09:05 +00:00
2017-04-01 07:20:54 +00:00
auto file = std : : make_unique < Poco : : TemporaryFile > ( params . tmp_path ) ;
const std : : string & path = file - > path ( ) ;
WriteBufferFromFile file_buf ( path ) ;
CompressedWriteBuffer compressed_buf ( file_buf ) ;
NativeBlockOutputStream block_out ( compressed_buf , ClickHouseRevision : : get ( ) ) ;
2015-11-30 19:57:46 +00:00
2017-04-01 07:20:54 +00:00
LOG_DEBUG ( log , " Writing part of aggregation data into temporary file " < < path < < " . " ) ;
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationWritePart ) ;
2015-12-01 14:09:05 +00:00
2017-07-25 13:09:52 +00:00
/// Flush only two-level data and possibly overflow data.
2015-11-30 19:57:46 +00:00
2015-12-01 14:09:05 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
writeToTemporaryFileImpl ( data_variants , * data_variants . NAME , block_out , path ) ;
2015-11-30 19:57:46 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-11-30 19:57:46 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2017-04-02 17:37:49 +00:00
/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
2017-04-01 07:20:54 +00:00
data_variants . init ( data_variants . type ) ;
data_variants . aggregates_pools = Arenas ( 1 , std : : make_shared < Arena > ( ) ) ;
data_variants . aggregates_pool = data_variants . aggregates_pools . back ( ) . get ( ) ;
2017-07-25 13:09:52 +00:00
data_variants . without_key = nullptr ;
2017-04-01 07:20:54 +00:00
block_out . flush ( ) ;
compressed_buf . next ( ) ;
file_buf . next ( ) ;
double elapsed_seconds = watch . elapsedSeconds ( ) ;
double compressed_bytes = file_buf . count ( ) ;
double uncompressed_bytes = compressed_buf . count ( ) ;
{
std : : lock_guard < std : : mutex > lock ( temporary_files . mutex ) ;
temporary_files . files . emplace_back ( std : : move ( file ) ) ;
temporary_files . sum_size_uncompressed + = uncompressed_bytes ;
temporary_files . sum_size_compressed + = compressed_bytes ;
}
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationCompressedBytes , compressed_bytes ) ;
ProfileEvents : : increment ( ProfileEvents : : ExternalAggregationUncompressedBytes , uncompressed_bytes ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Written part in " < < elapsed_seconds < < " sec., "
< < rows < < " rows, "
< < ( uncompressed_bytes / 1048576.0 ) < < " MiB uncompressed, "
< < ( compressed_bytes / 1048576.0 ) < < " MiB compressed, "
< < ( uncompressed_bytes / rows ) < < " uncompressed bytes per row, "
< < ( compressed_bytes / rows ) < < " compressed bytes per row, "
< < " compression rate: " < < ( uncompressed_bytes / compressed_bytes )
< < " ( " < < ( rows / elapsed_seconds ) < < " rows/sec., "
< < ( uncompressed_bytes / elapsed_seconds / 1048576.0 ) < < " MiB/sec. uncompressed, "
< < ( compressed_bytes / elapsed_seconds / 1048576.0 ) < < " MiB/sec. compressed) " ) ;
2015-11-30 19:57:46 +00:00
}
2015-12-06 14:27:09 +00:00
template < typename Method >
Block Aggregator : : convertOneBucketToBlock (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
bool final ,
size_t bucket ) const
2015-12-06 14:27:09 +00:00
{
2017-04-01 07:20:54 +00:00
Block block = prepareBlockAndFill ( data_variants , final , method . data . impls [ bucket ] . size ( ) ,
[ bucket , & method , this ] (
ColumnPlainPtrs & key_columns ,
AggregateColumnsData & aggregate_columns ,
ColumnPlainPtrs & final_aggregate_columns ,
const Sizes & key_sizes ,
bool final )
{
convertToBlockImpl ( method , method . data . impls [ bucket ] ,
key_columns , aggregate_columns , final_aggregate_columns , key_sizes , final ) ;
} ) ;
block . info . bucket_num = bucket ;
return block ;
2015-12-06 14:27:09 +00:00
}
2015-11-30 19:57:46 +00:00
template < typename Method >
void Aggregator : : writeToTemporaryFileImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
IBlockOutputStream & out ,
const String & path )
2015-11-30 19:57:46 +00:00
{
2017-04-01 07:20:54 +00:00
size_t max_temporary_block_size_rows = 0 ;
size_t max_temporary_block_size_bytes = 0 ;
2015-11-30 19:57:46 +00:00
2017-07-25 13:09:52 +00:00
auto update_max_sizes = [ & ] ( const Block & block )
2017-04-01 07:20:54 +00:00
{
size_t block_size_rows = block . rows ( ) ;
size_t block_size_bytes = block . bytes ( ) ;
2015-12-01 14:09:05 +00:00
2017-04-01 07:20:54 +00:00
if ( block_size_rows > max_temporary_block_size_rows )
2017-07-25 13:09:52 +00:00
max_temporary_block_size_rows = block_size_rows ;
2017-04-01 07:20:54 +00:00
if ( block_size_bytes > max_temporary_block_size_bytes )
max_temporary_block_size_bytes = block_size_bytes ;
2017-07-25 13:09:52 +00:00
} ;
for ( size_t bucket = 0 ; bucket < Method : : Data : : NUM_BUCKETS ; + + bucket )
{
Block block = convertOneBucketToBlock ( data_variants , method , false , bucket ) ;
out . write ( block ) ;
update_max_sizes ( block ) ;
}
if ( params . overflow_row )
{
Block block = prepareBlockAndFillWithoutKey ( data_variants , false , true ) ;
out . write ( block ) ;
update_max_sizes ( block ) ;
2017-04-01 07:20:54 +00:00
}
2015-11-30 19:57:46 +00:00
2017-07-25 13:09:52 +00:00
/// Pass ownership of the aggregate functions states:
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
2017-04-01 07:20:54 +00:00
data_variants . aggregator = nullptr ;
2015-12-01 16:58:15 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Max size of temporary block: " < < max_temporary_block_size_rows < < " rows, "
< < ( max_temporary_block_size_bytes / 1048576.0 ) < < " MiB. " ) ;
2015-11-30 19:57:46 +00:00
}
2015-10-22 01:44:33 +00:00
bool Aggregator : : checkLimits ( size_t result_size , bool & no_more_keys ) const
{
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys & & params . max_rows_to_group_by & & result_size > params . max_rows_to_group_by )
{
2017-09-08 03:47:27 +00:00
switch ( params . group_by_overflow_mode )
{
case OverflowMode : : THROW :
throw Exception ( " Limit for rows to GROUP BY exceeded: has " + toString ( result_size )
+ " rows, maximum: " + toString ( params . max_rows_to_group_by ) ,
ErrorCodes : : TOO_MUCH_ROWS ) ;
case OverflowMode : : BREAK :
return false ;
case OverflowMode : : ANY :
no_more_keys = true ;
break ;
}
2017-04-01 07:20:54 +00:00
}
return true ;
2014-05-10 05:16:23 +00:00
}
2017-09-08 02:29:47 +00:00
void Aggregator : : execute ( const BlockInputStreamPtr & stream , AggregatedDataVariants & result )
2011-09-19 01:42:16 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
StringRefs key ( params . keys_size ) ;
ConstColumnPlainPtrs key_columns ( params . keys_size ) ;
AggregateColumns aggregate_columns ( params . aggregates_size ) ;
Sizes key_sizes ;
2017-04-02 17:37:49 +00:00
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode = = ANY .
* In this case , new keys are not added to the set , but aggregation is performed only by
* keys that have already managed to get into the set .
2017-04-01 07:20:54 +00:00
*/
bool no_more_keys = false ;
LOG_TRACE ( log , " Aggregating " ) ;
Stopwatch watch ;
size_t src_rows = 0 ;
size_t src_bytes = 0 ;
2017-04-02 17:37:49 +00:00
/// Read all the data
2017-04-01 07:20:54 +00:00
while ( Block block = stream - > read ( ) )
{
if ( isCancelled ( ) )
return ;
src_rows + = block . rows ( ) ;
src_bytes + = block . bytes ( ) ;
if ( ! executeOnBlock ( block , result ,
key_columns , aggregate_columns , key_sizes , key ,
no_more_keys ) )
break ;
}
double elapsed_seconds = watch . elapsedSeconds ( ) ;
size_t rows = result . size ( ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Aggregated. " < < src_rows < < " to " < < rows < < " rows (from " < < src_bytes / 1048576.0 < < " MiB) "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < src_rows / elapsed_seconds < < " rows/sec., " < < src_bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void Aggregator : : convertToBlockImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
ColumnPlainPtrs & key_columns ,
AggregateColumnsData & aggregate_columns ,
ColumnPlainPtrs & final_aggregate_columns ,
const Sizes & key_sizes ,
bool final ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
if ( data . empty ( ) )
return ;
2015-12-11 00:34:00 +00:00
2017-04-01 07:20:54 +00:00
if ( final )
convertToBlockImplFinal ( method , data , key_columns , final_aggregate_columns , key_sizes ) ;
else
convertToBlockImplNotFinal ( method , data , key_columns , aggregate_columns , key_sizes ) ;
2015-12-06 19:42:28 +00:00
2017-04-02 17:37:49 +00:00
/// In order to release memory early.
2017-04-01 07:20:54 +00:00
data . clearAndShrink ( ) ;
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : convertToBlockImplFinal (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
ColumnPlainPtrs & key_columns ,
ColumnPlainPtrs & final_aggregate_columns ,
const Sizes & key_sizes ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
for ( const auto & value : data )
{
method . insertKeyIntoColumns ( value , key_columns , params . keys_size , key_sizes ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > insertResultInto (
Method : : getAggregateData ( value . second ) + offsets_of_aggregate_states [ i ] ,
* final_aggregate_columns [ i ] ) ;
}
2015-12-06 19:42:28 +00:00
2017-04-02 17:37:49 +00:00
destroyImpl ( method , data ) ; /// NOTE You can do better.
2015-01-03 05:39:21 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : convertToBlockImplNotFinal (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & data ,
ColumnPlainPtrs & key_columns ,
AggregateColumnsData & aggregate_columns ,
const Sizes & key_sizes ) const
2015-01-03 05:39:21 +00:00
{
2016-09-13 13:24:24 +00:00
2017-04-01 07:20:54 +00:00
for ( auto & value : data )
{
method . insertKeyIntoColumns ( value , key_columns , params . keys_size , key_sizes ) ;
2015-01-03 05:39:21 +00:00
2017-04-02 17:37:49 +00:00
/// reserved, so push_back does not throw exceptions
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] - > push_back ( Method : : getAggregateData ( value . second ) + offsets_of_aggregate_states [ i ] ) ;
2015-12-23 07:29:20 +00:00
2017-04-01 07:20:54 +00:00
Method : : getAggregateData ( value . second ) = nullptr ;
}
2011-09-19 01:42:16 +00:00
}
2015-01-02 03:16:28 +00:00
template < typename Filler >
Block Aggregator : : prepareBlockAndFill (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
bool final ,
size_t rows ,
Filler & & filler ) const
2012-02-27 06:28:20 +00:00
{
2017-04-01 07:20:54 +00:00
Block res = sample . cloneEmpty ( ) ;
ColumnPlainPtrs key_columns ( params . keys_size ) ;
AggregateColumnsData aggregate_columns ( params . aggregates_size ) ;
ColumnPlainPtrs final_aggregate_columns ( params . aggregates_size ) ;
for ( size_t i = 0 ; i < params . keys_size ; + + i )
{
key_columns [ i ] = res . safeGetByPosition ( i ) . column . get ( ) ;
key_columns [ i ] - > reserve ( rows ) ;
}
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
if ( ! final )
{
2017-04-02 17:37:49 +00:00
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
2017-04-01 07:20:54 +00:00
ColumnAggregateFunction & column_aggregate_func = static_cast < ColumnAggregateFunction & > (
* res . safeGetByPosition ( i + params . keys_size ) . column ) ;
for ( size_t j = 0 ; j < data_variants . aggregates_pools . size ( ) ; + + j )
column_aggregate_func . addArena ( data_variants . aggregates_pools [ j ] ) ;
aggregate_columns [ i ] = & column_aggregate_func . getData ( ) ;
aggregate_columns [ i ] - > reserve ( rows ) ;
}
else
{
ColumnWithTypeAndName & column = res . safeGetByPosition ( i + params . keys_size ) ;
column . type = aggregate_functions [ i ] - > getReturnType ( ) ;
column . column = column . type - > createColumn ( ) ;
column . column - > reserve ( rows ) ;
if ( aggregate_functions [ i ] - > isState ( ) )
{
2017-04-02 17:37:49 +00:00
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
2017-04-01 07:20:54 +00:00
ColumnAggregateFunction & column_aggregate_func = static_cast < ColumnAggregateFunction & > ( * column . column ) ;
for ( size_t j = 0 ; j < data_variants . aggregates_pools . size ( ) ; + + j )
column_aggregate_func . addArena ( data_variants . aggregates_pools [ j ] ) ;
}
final_aggregate_columns [ i ] = column . column . get ( ) ;
}
}
filler ( key_columns , aggregate_columns , final_aggregate_columns , data_variants . key_sizes , final ) ;
2017-04-02 17:37:49 +00:00
/// Change the size of the columns-constants in the block.
2017-04-01 07:20:54 +00:00
size_t columns = res . columns ( ) ;
for ( size_t i = 0 ; i < columns ; + + i )
if ( res . safeGetByPosition ( i ) . column - > isConst ( ) )
res . safeGetByPosition ( i ) . column = res . safeGetByPosition ( i ) . column - > cut ( 0 , rows ) ;
return res ;
2015-01-02 03:16:28 +00:00
}
2014-05-28 14:54:42 +00:00
2014-05-22 18:58:41 +00:00
2017-07-25 12:16:14 +00:00
Block Aggregator : : prepareBlockAndFillWithoutKey ( AggregatedDataVariants & data_variants , bool final , bool is_overflows ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
size_t rows = 1 ;
auto filler = [ & data_variants , this ] (
ColumnPlainPtrs & key_columns ,
AggregateColumnsData & aggregate_columns ,
ColumnPlainPtrs & final_aggregate_columns ,
const Sizes & key_sizes ,
bool final )
{
if ( data_variants . type = = AggregatedDataVariants : : Type : : without_key | | params . overflow_row )
{
AggregatedDataWithoutKey & data = data_variants . without_key ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
{
if ( ! final )
aggregate_columns [ i ] - > push_back ( data + offsets_of_aggregate_states [ i ] ) ;
else
aggregate_functions [ i ] - > insertResultInto ( data + offsets_of_aggregate_states [ i ] , * final_aggregate_columns [ i ] ) ;
}
if ( ! final )
data = nullptr ;
if ( params . overflow_row )
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] - > insertDefault ( ) ;
}
} ;
Block block = prepareBlockAndFill ( data_variants , final , rows , filler ) ;
if ( is_overflows )
block . info . is_overflows = true ;
if ( final )
destroyWithoutKey ( data_variants ) ;
2017-07-25 12:16:14 +00:00
return block ;
2015-01-02 03:16:28 +00:00
}
2014-05-22 18:58:41 +00:00
2017-07-25 12:16:14 +00:00
Block Aggregator : : prepareBlockAndFillSingleLevel ( AggregatedDataVariants & data_variants , bool final ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
size_t rows = data_variants . sizeWithoutOverflowRow ( ) ;
auto filler = [ & data_variants , this ] (
ColumnPlainPtrs & key_columns ,
AggregateColumnsData & aggregate_columns ,
ColumnPlainPtrs & final_aggregate_columns ,
const Sizes & key_sizes ,
bool final )
{
# define M(NAME) \
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
convertToBlockImpl ( * data_variants . NAME , data_variants . NAME - > data , \
key_columns , aggregate_columns , final_aggregate_columns , data_variants . key_sizes , final ) ;
if ( false ) { }
APPLY_FOR_VARIANTS_SINGLE_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
} ;
2017-07-25 12:16:14 +00:00
return prepareBlockAndFill ( data_variants , final , rows , filler ) ;
2015-01-02 03:16:28 +00:00
}
2016-08-02 01:46:05 +00:00
BlocksList Aggregator : : prepareBlocksAndFillTwoLevel ( AggregatedDataVariants & data_variants , bool final , ThreadPool * thread_pool ) const
2015-01-02 03:16:28 +00:00
{
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data_variants . type = = AggregatedDataVariants : : Type : : NAME ) \
return prepareBlocksAndFillTwoLevelImpl ( data_variants , * data_variants . NAME , final , thread_pool ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-01-02 03:16:28 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-01-02 03:16:28 +00:00
}
template < typename Method >
BlocksList Aggregator : : prepareBlocksAndFillTwoLevelImpl (
2017-04-01 07:20:54 +00:00
AggregatedDataVariants & data_variants ,
Method & method ,
bool final ,
ThreadPool * thread_pool ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
auto converter = [ & ] ( size_t bucket , MemoryTracker * memory_tracker )
{
current_memory_tracker = memory_tracker ;
return convertOneBucketToBlock ( data_variants , method , final , bucket ) ;
} ;
2017-04-02 17:37:49 +00:00
/// packaged_task is used to ensure that exceptions are automatically thrown into the main stream.
2017-04-01 07:20:54 +00:00
std : : vector < std : : packaged_task < Block ( ) > > tasks ( Method : : Data : : NUM_BUCKETS ) ;
try
{
for ( size_t bucket = 0 ; bucket < Method : : Data : : NUM_BUCKETS ; + + bucket )
{
if ( method . data . impls [ bucket ] . empty ( ) )
continue ;
tasks [ bucket ] = std : : packaged_task < Block ( ) > ( std : : bind ( converter , bucket , current_memory_tracker ) ) ;
if ( thread_pool )
thread_pool - > schedule ( [ bucket , & tasks ] { tasks [ bucket ] ( ) ; } ) ;
else
tasks [ bucket ] ( ) ;
}
}
catch ( . . . )
{
2017-04-02 17:37:49 +00:00
/// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad.
2017-04-01 07:20:54 +00:00
if ( thread_pool )
thread_pool - > wait ( ) ;
throw ;
}
if ( thread_pool )
thread_pool - > wait ( ) ;
BlocksList blocks ;
for ( auto & task : tasks )
{
if ( ! task . valid ( ) )
continue ;
blocks . emplace_back ( task . get_future ( ) . get ( ) ) ;
}
return blocks ;
2015-01-02 03:16:28 +00:00
}
2015-12-08 02:01:46 +00:00
BlocksList Aggregator : : convertToBlocks ( AggregatedDataVariants & data_variants , bool final , size_t max_threads ) const
2015-01-02 03:16:28 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Converting aggregated data to blocks " ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
Stopwatch watch ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
BlocksList blocks ;
2015-01-02 03:16:28 +00:00
2017-04-02 17:37:49 +00:00
/// In what data structure is the data aggregated?
2017-04-01 07:20:54 +00:00
if ( data_variants . empty ( ) )
return blocks ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
std : : unique_ptr < ThreadPool > thread_pool ;
2017-04-02 17:37:49 +00:00
if ( max_threads > 1 & & data_variants . sizeWithoutOverflowRow ( ) > 100000 /// TODO Make a custom threshold.
& & data_variants . isTwoLevel ( ) ) /// TODO Use the shared thread pool with the `merge` function.
2017-04-01 07:20:54 +00:00
thread_pool = std : : make_unique < ThreadPool > ( max_threads ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-07-25 12:16:14 +00:00
if ( data_variants . without_key )
blocks . emplace_back ( prepareBlockAndFillWithoutKey (
2017-04-01 07:20:54 +00:00
data_variants , final , data_variants . type ! = AggregatedDataVariants : : Type : : without_key ) ) ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
if ( data_variants . type ! = AggregatedDataVariants : : Type : : without_key )
{
if ( ! data_variants . isTwoLevel ( ) )
2017-07-25 12:16:14 +00:00
blocks . emplace_back ( prepareBlockAndFillSingleLevel ( data_variants , final ) ) ;
2017-04-01 07:20:54 +00:00
else
blocks . splice ( blocks . end ( ) , prepareBlocksAndFillTwoLevel ( data_variants , final , thread_pool . get ( ) ) ) ;
}
2014-02-26 11:44:54 +00:00
2017-04-01 07:20:54 +00:00
if ( ! final )
{
2017-04-02 17:37:49 +00:00
/// data_variants will not destroy the states of aggregate functions in the destructor.
/// Now ColumnAggregateFunction owns the states.
2017-04-01 07:20:54 +00:00
data_variants . aggregator = nullptr ;
}
2013-02-16 18:59:05 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return BlocksList ( ) ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
size_t rows = 0 ;
size_t bytes = 0 ;
2015-01-02 03:16:28 +00:00
2017-04-01 07:20:54 +00:00
for ( const auto & block : blocks )
{
rows + = block . rows ( ) ;
bytes + = block . bytes ( ) ;
}
2012-02-27 06:28:20 +00:00
2017-04-01 07:20:54 +00:00
double elapsed_seconds = watch . elapsedSeconds ( ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Converted aggregated data to blocks. "
< < rows < < " rows, " < < bytes / 1048576.0 < < " MiB "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < rows / elapsed_seconds < < " rows/sec., " < < bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2012-05-31 00:33:42 +00:00
2017-04-01 07:20:54 +00:00
return blocks ;
2012-02-27 06:28:20 +00:00
}
2015-01-03 05:39:21 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
Table & table_src ,
Arena * arena ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
for ( auto it = table_src . begin ( ) ; it ! = table_src . end ( ) ; + + it )
{
decltype ( it ) res_it ;
bool inserted ;
table_dst . emplace ( it - > first , res_it , inserted , it . getHash ( ) ) ;
if ( ! inserted )
{
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
Method : : getAggregateData ( res_it - > second ) + offsets_of_aggregate_states [ i ] ,
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ,
arena ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy (
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ) ;
}
else
{
res_it - > second = it - > second ;
}
Method : : getAggregateData ( it - > second ) = nullptr ;
}
table_src . clearAndShrink ( ) ;
2015-01-03 05:39:21 +00:00
}
2015-10-22 01:44:33 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataNoMoreKeysImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
AggregatedDataWithoutKey & overflows ,
Table & table_src ,
Arena * arena ) const
2015-10-22 01:44:33 +00:00
{
2017-04-01 07:20:54 +00:00
for ( auto it = table_src . begin ( ) ; it ! = table_src . end ( ) ; + + it )
{
decltype ( it ) res_it = table_dst . find ( it - > first , it . getHash ( ) ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
AggregateDataPtr res_data = table_dst . end ( ) = = res_it
? overflows
: Method : : getAggregateData ( res_it - > second ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
res_data + offsets_of_aggregate_states [ i ] ,
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ,
arena ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy (
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
Method : : getAggregateData ( it - > second ) = nullptr ;
}
2015-12-23 07:06:34 +00:00
2017-04-01 07:20:54 +00:00
table_src . clearAndShrink ( ) ;
2015-10-22 01:44:33 +00:00
}
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeDataOnlyExistingKeysImpl (
2017-04-01 07:20:54 +00:00
Table & table_dst ,
Table & table_src ,
Arena * arena ) const
2015-10-22 01:44:33 +00:00
{
2017-04-01 07:20:54 +00:00
for ( auto it = table_src . begin ( ) ; it ! = table_src . end ( ) ; + + it )
{
decltype ( it ) res_it = table_dst . find ( it - > first , it . getHash ( ) ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
if ( table_dst . end ( ) = = res_it )
continue ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
AggregateDataPtr res_data = Method : : getAggregateData ( res_it - > second ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge (
res_data + offsets_of_aggregate_states [ i ] ,
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ,
arena ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy (
Method : : getAggregateData ( it - > second ) + offsets_of_aggregate_states [ i ] ) ;
2015-10-22 01:44:33 +00:00
2017-04-01 07:20:54 +00:00
Method : : getAggregateData ( it - > second ) = nullptr ;
}
2015-12-23 07:06:34 +00:00
2017-04-01 07:20:54 +00:00
table_src . clearAndShrink ( ) ;
2015-10-22 01:44:33 +00:00
}
2015-01-03 05:39:21 +00:00
void NO_INLINE Aggregator : : mergeWithoutKeyDataImpl (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & non_empty_data ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = non_empty_data [ 0 ] ;
2015-01-03 05:39:21 +00:00
2017-04-02 17:37:49 +00:00
/// We connect all aggregation results to the first.
2017-04-01 07:20:54 +00:00
for ( size_t i = 1 , size = non_empty_data . size ( ) ; i < size ; + + i )
{
AggregatedDataWithoutKey & res_data = res - > without_key ;
AggregatedDataWithoutKey & current_data = non_empty_data [ i ] - > without_key ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge ( res_data + offsets_of_aggregate_states [ i ] , current_data + offsets_of_aggregate_states [ i ] , res - > aggregates_pool ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > destroy ( current_data + offsets_of_aggregate_states [ i ] ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
current_data = nullptr ;
}
2015-01-03 05:39:21 +00:00
}
template < typename Method >
void NO_INLINE Aggregator : : mergeSingleLevelDataImpl (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & non_empty_data ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = non_empty_data [ 0 ] ;
bool no_more_keys = false ;
2017-04-02 17:37:49 +00:00
/// We connect all aggregation results to the first.
2017-04-01 07:20:54 +00:00
for ( size_t i = 1 , size = non_empty_data . size ( ) ; i < size ; + + i )
{
if ( ! checkLimits ( res - > sizeWithoutOverflowRow ( ) , no_more_keys ) )
break ;
AggregatedDataVariants & current = * non_empty_data [ i ] ;
if ( ! no_more_keys )
mergeDataImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
else if ( res - > without_key )
mergeDataNoMoreKeysImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
res - > without_key ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
else
mergeDataOnlyExistingKeysImpl < Method > (
getDataVariant < Method > ( * res ) . data ,
getDataVariant < Method > ( current ) . data ,
res - > aggregates_pool ) ;
2017-04-02 17:37:49 +00:00
/// `current` will not destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
current . aggregator = nullptr ;
}
2015-01-03 05:39:21 +00:00
}
2015-12-06 16:22:01 +00:00
template < typename Method >
void NO_INLINE Aggregator : : mergeBucketImpl (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & data , Int32 bucket , Arena * arena ) const
2015-12-06 16:22:01 +00:00
{
2017-04-02 17:37:49 +00:00
/// We connect all aggregation results to the first.
2017-04-01 07:20:54 +00:00
AggregatedDataVariantsPtr & res = data [ 0 ] ;
for ( size_t i = 1 , size = data . size ( ) ; i < size ; + + i )
{
AggregatedDataVariants & current = * data [ i ] ;
mergeDataImpl < Method > (
getDataVariant < Method > ( * res ) . data . impls [ bucket ] ,
getDataVariant < Method > ( current ) . data . impls [ bucket ] ,
arena ) ;
}
2015-12-06 16:22:01 +00:00
}
2017-04-02 17:37:49 +00:00
/** Combines aggregation states together, turns them into blocks, and outputs streams.
* If the aggregation states are two - level , then it produces blocks strictly in order bucket_num .
* ( This is important for distributed processing . )
* In doing so , it can handle different buckets in parallel , using up to ` threads ` threads .
2015-12-06 16:22:01 +00:00
*/
class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
{
public :
2017-04-02 17:37:49 +00:00
/** The input is a set of non-empty sets of partially aggregated data,
* which are all either single - level , or are two - level .
2017-04-01 07:20:54 +00:00
*/
MergingAndConvertingBlockInputStream ( const Aggregator & aggregator_ , ManyAggregatedDataVariants & data_ , bool final_ , size_t threads_ )
: aggregator ( aggregator_ ) , data ( data_ ) , final ( final_ ) , threads ( threads_ )
{
/// At least we need one arena in first data item per thread
if ( ! data . empty ( ) & & threads > data [ 0 ] - > aggregates_pools . size ( ) )
{
Arenas & first_pool = data [ 0 ] - > aggregates_pools ;
for ( size_t j = first_pool . size ( ) ; j < threads ; j + + )
first_pool . emplace_back ( std : : make_shared < Arena > ( ) ) ;
}
}
String getName ( ) const override { return " MergingAndConverting " ; }
String getID ( ) const override
{
std : : stringstream res ;
res < < this ;
return res . str ( ) ;
}
2015-12-06 16:22:01 +00:00
protected :
2017-04-01 07:20:54 +00:00
Block readImpl ( ) override
{
if ( data . empty ( ) )
return { } ;
if ( current_bucket_num > = NUM_BUCKETS )
return { } ;
AggregatedDataVariantsPtr & first = data [ 0 ] ;
if ( current_bucket_num = = - 1 )
{
+ + current_bucket_num ;
if ( first - > type = = AggregatedDataVariants : : Type : : without_key | | aggregator . params . overflow_row )
{
aggregator . mergeWithoutKeyDataImpl ( data ) ;
2017-07-25 12:16:14 +00:00
return aggregator . prepareBlockAndFillWithoutKey (
* first , final , first - > type ! = AggregatedDataVariants : : Type : : without_key ) ;
2017-04-01 07:20:54 +00:00
}
}
if ( ! first - > isTwoLevel ( ) )
{
if ( current_bucket_num > 0 )
return { } ;
if ( first - > type = = AggregatedDataVariants : : Type : : without_key )
return { } ;
+ + current_bucket_num ;
# define M(NAME) \
else if ( first - > type = = AggregatedDataVariants : : Type : : NAME ) \
aggregator . mergeSingleLevelDataImpl < decltype ( first - > NAME ) : : element_type > ( data ) ;
if ( false ) { }
APPLY_FOR_VARIANTS_SINGLE_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2017-07-25 12:16:14 +00:00
return aggregator . prepareBlockAndFillSingleLevel ( * first , final ) ;
2017-04-01 07:20:54 +00:00
}
else
{
if ( ! parallel_merge_data )
{
parallel_merge_data = std : : make_unique < ParallelMergeData > ( threads ) ;
for ( size_t i = 0 ; i < threads ; + + i )
scheduleThreadForNextBucket ( ) ;
}
Block res ;
while ( true )
{
std : : unique_lock < std : : mutex > lock ( parallel_merge_data - > mutex ) ;
if ( parallel_merge_data - > exception )
std : : rethrow_exception ( parallel_merge_data - > exception ) ;
auto it = parallel_merge_data - > ready_blocks . find ( current_bucket_num ) ;
if ( it ! = parallel_merge_data - > ready_blocks . end ( ) )
{
+ + current_bucket_num ;
scheduleThreadForNextBucket ( ) ;
if ( it - > second )
{
res . swap ( it - > second ) ;
break ;
}
else if ( current_bucket_num > = NUM_BUCKETS )
break ;
}
parallel_merge_data - > condvar . wait ( lock ) ;
}
return res ;
}
}
2015-12-06 16:22:01 +00:00
private :
2017-04-01 07:20:54 +00:00
const Aggregator & aggregator ;
ManyAggregatedDataVariants data ;
bool final ;
size_t threads ;
Int32 current_bucket_num = - 1 ;
Int32 max_scheduled_bucket_num = - 1 ;
static constexpr Int32 NUM_BUCKETS = 256 ;
struct ParallelMergeData
{
ThreadPool pool ;
std : : map < Int32 , Block > ready_blocks ;
std : : exception_ptr exception ;
std : : mutex mutex ;
std : : condition_variable condvar ;
2017-09-07 21:04:48 +00:00
explicit ParallelMergeData ( size_t threads ) : pool ( threads ) { }
2017-04-01 07:20:54 +00:00
~ ParallelMergeData ( )
{
LOG_TRACE ( & Logger : : get ( __PRETTY_FUNCTION__ ) , " Waiting for threads to finish " ) ;
pool . wait ( ) ;
}
} ;
std : : unique_ptr < ParallelMergeData > parallel_merge_data ;
void scheduleThreadForNextBucket ( )
{
+ + max_scheduled_bucket_num ;
if ( max_scheduled_bucket_num > = NUM_BUCKETS )
return ;
parallel_merge_data - > pool . schedule ( std : : bind ( & MergingAndConvertingBlockInputStream : : thread , this ,
max_scheduled_bucket_num , current_memory_tracker ) ) ;
}
void thread ( Int32 bucket_num , MemoryTracker * memory_tracker )
{
current_memory_tracker = memory_tracker ;
setThreadName ( " MergingAggregtd " ) ;
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : QueryThread } ;
try
{
/// TODO: add no_more_keys support maybe
auto & merged_data = * data [ 0 ] ;
auto method = merged_data . type ;
Block block ;
/// Select Arena to avoid race conditions
size_t thread_number = static_cast < size_t > ( bucket_num ) % threads ;
Arena * arena = merged_data . aggregates_pools . at ( thread_number ) . get ( ) ;
if ( false ) { }
# define M(NAME) \
else if ( method = = AggregatedDataVariants : : Type : : NAME ) \
{ \
aggregator . mergeBucketImpl < decltype ( merged_data . NAME ) : : element_type > ( data , bucket_num , arena ) ; \
block = aggregator . convertOneBucketToBlock ( merged_data , * merged_data . NAME , final , bucket_num ) ; \
}
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
std : : lock_guard < std : : mutex > lock ( parallel_merge_data - > mutex ) ;
parallel_merge_data - > ready_blocks [ bucket_num ] = std : : move ( block ) ;
}
catch ( . . . )
{
std : : lock_guard < std : : mutex > lock ( parallel_merge_data - > mutex ) ;
if ( ! parallel_merge_data - > exception )
parallel_merge_data - > exception = std : : current_exception ( ) ;
}
parallel_merge_data - > condvar . notify_all ( ) ;
}
2015-12-06 16:22:01 +00:00
} ;
2015-12-23 07:29:20 +00:00
std : : unique_ptr < IBlockInputStream > Aggregator : : mergeAndConvertToBlocks (
2017-04-01 07:20:54 +00:00
ManyAggregatedDataVariants & data_variants , bool final , size_t max_threads ) const
2015-12-06 16:22:01 +00:00
{
2017-04-01 07:20:54 +00:00
if ( data_variants . empty ( ) )
throw Exception ( " Empty data passed to Aggregator::mergeAndConvertToBlocks. " , ErrorCodes : : EMPTY_DATA_PASSED ) ;
LOG_TRACE ( log , " Merging aggregated data " ) ;
ManyAggregatedDataVariants non_empty_data ;
non_empty_data . reserve ( data_variants . size ( ) ) ;
for ( auto & data : data_variants )
if ( ! data - > empty ( ) )
non_empty_data . push_back ( data ) ;
if ( non_empty_data . empty ( ) )
return std : : make_unique < NullBlockInputStream > ( ) ;
if ( non_empty_data . size ( ) > 1 )
{
2017-04-02 17:37:49 +00:00
/// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first).
2017-04-01 07:20:54 +00:00
std : : sort ( non_empty_data . begin ( ) , non_empty_data . end ( ) ,
[ ] ( const AggregatedDataVariantsPtr & lhs , const AggregatedDataVariantsPtr & rhs )
{
return lhs - > sizeWithoutOverflowRow ( ) > rhs - > sizeWithoutOverflowRow ( ) ;
} ) ;
}
2017-04-02 17:37:49 +00:00
/// If at least one of the options is two-level, then convert all the options into two-level ones, if there are not such.
/// Note - perhaps it would be more optimal not to convert single-level versions before the merge, but merge them separately, at the end.
2017-04-01 07:20:54 +00:00
bool has_at_least_one_two_level = false ;
for ( const auto & variant : non_empty_data )
{
if ( variant - > isTwoLevel ( ) )
{
has_at_least_one_two_level = true ;
break ;
}
}
if ( has_at_least_one_two_level )
for ( auto & variant : non_empty_data )
if ( ! variant - > isTwoLevel ( ) )
variant - > convertToTwoLevel ( ) ;
AggregatedDataVariantsPtr & first = non_empty_data [ 0 ] ;
for ( size_t i = 1 , size = non_empty_data . size ( ) ; i < size ; + + i )
{
if ( first - > type ! = non_empty_data [ i ] - > type )
throw Exception ( " Cannot merge different aggregated data variants. " , ErrorCodes : : CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS ) ;
2017-04-02 17:37:49 +00:00
/** Elements from the remaining sets can be moved to the first data set.
* Therefore , it must own all the arenas of all other sets .
2017-04-01 07:20:54 +00:00
*/
first - > aggregates_pools . insert ( first - > aggregates_pools . end ( ) ,
non_empty_data [ i ] - > aggregates_pools . begin ( ) , non_empty_data [ i ] - > aggregates_pools . end ( ) ) ;
}
return std : : make_unique < MergingAndConvertingBlockInputStream > ( * this , non_empty_data , final , max_threads ) ;
2015-12-06 16:22:01 +00:00
}
2015-10-23 01:43:42 +00:00
template < bool no_more_keys , typename Method , typename Table >
void NO_INLINE Aggregator : : mergeStreamsImplCase (
2017-04-01 07:20:54 +00:00
Block & block ,
const Sizes & key_sizes ,
Arena * aggregates_pool ,
Method & method ,
Table & data ,
AggregateDataPtr overflow_row ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
ConstColumnPlainPtrs key_columns ( params . keys_size ) ;
AggregateColumnsData aggregate_columns ( params . aggregates_size ) ;
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = block . safeGetByPosition ( i ) . column . get ( ) ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] = & typeid_cast < ColumnAggregateFunction & > ( * block . safeGetByPosition ( params . keys_size + i ) . column ) . getData ( ) ;
typename Method : : State state ;
state . init ( key_columns ) ;
2017-04-02 17:37:49 +00:00
/// For all rows.
2017-04-01 07:20:54 +00:00
StringRefs keys ( params . keys_size ) ;
size_t rows = block . rows ( ) ;
for ( size_t i = 0 ; i < rows ; + + i )
{
typename Table : : iterator it ;
2017-04-02 17:37:49 +00:00
bool inserted ; /// Inserted a new key, or was this key already?
bool overflow = false ; /// The new key did not fit in the hash table because of no_more_keys.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Get the key to insert into the hash table.
2017-04-01 07:20:54 +00:00
auto key = state . getKey ( key_columns , params . keys_size , i , key_sizes , keys , * aggregates_pool ) ;
if ( ! no_more_keys )
{
data . emplace ( key , it , inserted ) ;
}
else
{
inserted = false ;
it = data . find ( key ) ;
if ( data . end ( ) = = it )
overflow = true ;
}
2017-04-02 17:37:49 +00:00
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
2017-04-01 07:20:54 +00:00
if ( no_more_keys & & overflow & & ! overflow_row )
{
method . onExistingKey ( key , keys , * aggregates_pool ) ;
continue ;
}
2017-04-02 17:37:49 +00:00
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
2017-04-01 07:20:54 +00:00
if ( inserted )
{
AggregateDataPtr & aggregate_data = Method : : getAggregateData ( it - > second ) ;
aggregate_data = nullptr ;
method . onNewKey ( * it , params . keys_size , i , keys , * aggregates_pool ) ;
AggregateDataPtr place = aggregates_pool - > alloc ( total_size_of_aggregate_states ) ;
createAggregateStates ( place ) ;
aggregate_data = place ;
}
else
method . onExistingKey ( key , keys , * aggregates_pool ) ;
AggregateDataPtr value = ( ! no_more_keys | | ! overflow ) ? Method : : getAggregateData ( it - > second ) : overflow_row ;
2017-04-02 17:37:49 +00:00
/// Merge state of aggregate functions.
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < params . aggregates_size ; + + j )
aggregate_functions [ j ] - > merge (
value + offsets_of_aggregate_states [ j ] ,
( * aggregate_columns [ j ] ) [ i ] ,
aggregates_pool ) ;
}
2017-04-02 17:37:49 +00:00
/// Early release memory.
2017-04-01 07:20:54 +00:00
block . clear ( ) ;
2015-01-03 05:39:21 +00:00
}
2015-10-23 01:43:42 +00:00
template < typename Method , typename Table >
void NO_INLINE Aggregator : : mergeStreamsImpl (
2017-04-01 07:20:54 +00:00
Block & block ,
const Sizes & key_sizes ,
Arena * aggregates_pool ,
Method & method ,
Table & data ,
AggregateDataPtr overflow_row ,
bool no_more_keys ) const
2015-10-23 01:43:42 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! no_more_keys )
mergeStreamsImplCase < false > ( block , key_sizes , aggregates_pool , method , data , overflow_row ) ;
else
mergeStreamsImplCase < true > ( block , key_sizes , aggregates_pool , method , data , overflow_row ) ;
2015-10-23 01:43:42 +00:00
}
2015-01-03 05:39:21 +00:00
void NO_INLINE Aggregator : : mergeWithoutKeyStreamsImpl (
2017-04-01 07:20:54 +00:00
Block & block ,
AggregatedDataVariants & result ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
AggregateColumnsData aggregate_columns ( params . aggregates_size ) ;
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_columns [ i ] = & typeid_cast < ColumnAggregateFunction & > ( * block . safeGetByPosition ( params . keys_size + i ) . column ) . getData ( ) ;
AggregatedDataWithoutKey & res = result . without_key ;
if ( ! res )
{
AggregateDataPtr place = result . aggregates_pool - > alloc ( total_size_of_aggregate_states ) ;
createAggregateStates ( place ) ;
res = place ;
}
2017-04-02 17:37:49 +00:00
/// Adding Values
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
aggregate_functions [ i ] - > merge ( res + offsets_of_aggregate_states [ i ] , ( * aggregate_columns [ i ] ) [ 0 ] , result . aggregates_pool ) ;
2017-04-02 17:37:49 +00:00
/// Early release memory.
2017-04-01 07:20:54 +00:00
block . clear ( ) ;
2015-01-03 05:39:21 +00:00
}
2017-09-08 02:29:47 +00:00
void Aggregator : : mergeStream ( const BlockInputStreamPtr & stream , AggregatedDataVariants & result , size_t max_threads )
2012-05-30 01:38:02 +00:00
{
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
StringRefs key ( params . keys_size ) ;
ConstColumnPlainPtrs key_columns ( params . keys_size ) ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
AggregateColumnsData aggregate_columns ( params . aggregates_size ) ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
initialize ( { } ) ;
2013-02-13 19:24:19 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-02 17:37:49 +00:00
/** If the remote servers used a two-level aggregation method,
* then blocks will contain information about the number of the bucket .
* Then the calculations can be parallelized by buckets .
* We decompose the blocks to the bucket numbers indicated in them .
2017-04-01 07:20:54 +00:00
*/
using BucketToBlocks = std : : map < Int32 , BlocksList > ;
BucketToBlocks bucket_to_blocks ;
2015-01-03 03:18:49 +00:00
2017-04-02 17:37:49 +00:00
/// Read all the data.
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Reading blocks of partially aggregated data. " ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
size_t total_input_rows = 0 ;
size_t total_input_blocks = 0 ;
while ( Block block = stream - > read ( ) )
{
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
total_input_rows + = block . rows ( ) ;
+ + total_input_blocks ;
bucket_to_blocks [ block . info . bucket_num ] . emplace_back ( std : : move ( block ) ) ;
}
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Read " < < total_input_blocks < < " blocks of partially aggregated data, total " < < total_input_rows < < " rows. " ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
if ( bucket_to_blocks . empty ( ) )
return ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
setSampleBlock ( bucket_to_blocks . begin ( ) - > second . front ( ) ) ;
2015-01-03 05:39:21 +00:00
2017-04-02 17:37:49 +00:00
/// How to perform the aggregation?
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = sample . safeGetByPosition ( i ) . column . get ( ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
Sizes key_sizes ;
AggregatedDataVariants : : Type method = chooseAggregationMethod ( key_columns , key_sizes ) ;
2015-01-03 05:39:21 +00:00
2017-04-02 17:37:49 +00:00
/** `minus one` means the absence of information about the bucket
* - in the case of single - level aggregation , as well as for blocks with " overflowing " values .
* If there is at least one block with a bucket number greater than zero , then there was a two - level aggregation .
2017-04-01 07:20:54 +00:00
*/
auto max_bucket = bucket_to_blocks . rbegin ( ) - > first ;
size_t has_two_level = max_bucket > 0 ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
if ( has_two_level )
{
# define M(NAME) \
if ( method = = AggregatedDataVariants : : Type : : NAME ) \
method = AggregatedDataVariants : : Type : : NAME # # _two_level ;
2014-06-26 00:58:14 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2014-06-26 00:58:14 +00:00
2017-04-01 07:20:54 +00:00
# undef M
}
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-02 17:37:49 +00:00
/// result will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
2015-03-23 01:10:06 +00:00
2017-04-01 07:20:54 +00:00
result . init ( method ) ;
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
bool has_blocks_with_unknown_bucket = bucket_to_blocks . count ( - 1 ) ;
2015-01-17 04:49:13 +00:00
2017-04-02 17:37:49 +00:00
/// First, parallel the merge for the individual buckets. Then we continue merge the data not allocated to the buckets.
2017-04-01 07:20:54 +00:00
if ( has_two_level )
{
2017-04-02 17:37:49 +00:00
/** In this case, no_more_keys is not supported due to the fact that
* from different threads it is difficult to update the general state for " other " keys ( overflows ) .
* That is , the keys in the end can be significantly larger than max_rows_to_group_by .
2017-04-01 07:20:54 +00:00
*/
2015-10-23 01:43:42 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Merging partially aggregated two-level data. " ) ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
auto merge_bucket = [ & bucket_to_blocks , & result , & key_sizes , this ] ( Int32 bucket , Arena * aggregates_pool , MemoryTracker * memory_tracker )
{
current_memory_tracker = memory_tracker ;
2012-05-30 01:38:02 +00:00
2017-04-01 07:20:54 +00:00
for ( Block & block : bucket_to_blocks [ bucket ] )
{
if ( isCancelled ( ) )
return ;
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
# define M(NAME) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
mergeStreamsImpl ( block , key_sizes , aggregates_pool , * result . NAME , result . NAME - > data . impls [ bucket ] , nullptr , false ) ;
2015-01-03 05:39:21 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
# undef M
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
} ;
std : : unique_ptr < ThreadPool > thread_pool ;
2017-04-02 17:37:49 +00:00
if ( max_threads > 1 & & total_input_rows > 100000 /// TODO Make a custom threshold.
2017-04-01 07:20:54 +00:00
& & has_two_level )
thread_pool = std : : make_unique < ThreadPool > ( max_threads ) ;
for ( const auto & bucket_blocks : bucket_to_blocks )
{
const auto bucket = bucket_blocks . first ;
if ( bucket = = - 1 )
continue ;
result . aggregates_pools . push_back ( std : : make_shared < Arena > ( ) ) ;
Arena * aggregates_pool = result . aggregates_pools . back ( ) . get ( ) ;
auto task = std : : bind ( merge_bucket , bucket , aggregates_pool , current_memory_tracker ) ;
if ( thread_pool )
thread_pool - > schedule ( task ) ;
else
task ( ) ;
}
2013-05-06 11:45:28 +00:00
2017-04-01 07:20:54 +00:00
if ( thread_pool )
thread_pool - > wait ( ) ;
2014-12-30 10:16:23 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Merged partially aggregated two-level data. " ) ;
}
if ( isCancelled ( ) )
{
result . invalidate ( ) ;
return ;
}
if ( has_blocks_with_unknown_bucket )
{
LOG_TRACE ( log , " Merging partially aggregated single-level data. " ) ;
bool no_more_keys = false ;
BlocksList & blocks = bucket_to_blocks [ - 1 ] ;
for ( Block & block : blocks )
{
if ( isCancelled ( ) )
{
result . invalidate ( ) ;
return ;
}
2015-04-16 14:27:56 +00:00
2017-04-01 07:20:54 +00:00
if ( ! checkLimits ( result . sizeWithoutOverflowRow ( ) , no_more_keys ) )
break ;
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | block . info . is_overflows )
mergeWithoutKeyStreamsImpl ( block , result ) ;
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
mergeStreamsImpl ( block , key_sizes , result . aggregates_pool , * result . NAME , result . NAME - > data , result . without_key , no_more_keys ) ;
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
LOG_TRACE ( log , " Merged partially aggregated single-level data. " ) ;
}
2015-01-03 05:39:21 +00:00
}
2015-07-30 23:41:02 +00:00
Block Aggregator : : mergeBlocks ( BlocksList & blocks , bool final )
{
2017-04-01 07:20:54 +00:00
if ( blocks . empty ( ) )
return { } ;
2015-07-30 23:41:02 +00:00
2017-07-25 12:16:14 +00:00
auto bucket_num = blocks . front ( ) . info . bucket_num ;
bool is_overflows = blocks . front ( ) . info . is_overflows ;
LOG_TRACE ( log , " Merging partially aggregated blocks (bucket = " < < bucket_num < < " ). " ) ;
Stopwatch watch ;
2017-04-01 07:20:54 +00:00
StringRefs key ( params . keys_size ) ;
ConstColumnPlainPtrs key_columns ( params . keys_size ) ;
2015-07-30 23:41:02 +00:00
2017-04-01 07:20:54 +00:00
AggregateColumnsData aggregate_columns ( params . aggregates_size ) ;
2015-07-30 23:41:02 +00:00
2017-04-01 07:20:54 +00:00
initialize ( { } ) ;
setSampleBlock ( blocks . front ( ) ) ;
2015-07-30 23:41:02 +00:00
2017-04-02 17:37:49 +00:00
/// How to perform the aggregation?
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = sample . safeGetByPosition ( i ) . column . get ( ) ;
2015-07-30 23:41:02 +00:00
2017-04-01 07:20:54 +00:00
Sizes key_sizes ;
AggregatedDataVariants : : Type method = chooseAggregationMethod ( key_columns , key_sizes ) ;
2015-07-30 23:41:02 +00:00
2017-04-01 07:20:54 +00:00
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
* Better hash function is needed because during external aggregation ,
* we may merge partitions of data with total number of keys far greater than 4 billion .
*/
2016-09-23 05:49:55 +00:00
# define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
2017-04-01 07:20:54 +00:00
M ( key64 ) \
M ( key_string ) \
M ( key_fixed_string ) \
M ( keys128 ) \
M ( keys256 ) \
M ( concat ) \
M ( serialized ) \
2016-09-23 05:49:55 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
if ( method = = AggregatedDataVariants : : Type : : NAME ) \
method = AggregatedDataVariants : : Type : : NAME # # _hash64 ; \
2016-09-23 05:49:55 +00:00
2017-04-01 07:20:54 +00:00
APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION ( M )
2016-09-23 05:49:55 +00:00
# undef M
# undef APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION
2017-04-02 17:37:49 +00:00
/// Temporary data for aggregation.
2017-04-01 07:20:54 +00:00
AggregatedDataVariants result ;
2017-04-02 17:37:49 +00:00
/// result will destroy the states of aggregate functions in the destructor
2017-04-01 07:20:54 +00:00
result . aggregator = this ;
result . init ( method ) ;
result . keys_size = params . keys_size ;
result . key_sizes = key_sizes ;
for ( Block & block : blocks )
{
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
return { } ;
2017-07-25 17:56:09 +00:00
if ( bucket_num > = 0 & & block . info . bucket_num ! = bucket_num )
bucket_num = - 1 ;
2017-07-25 12:16:14 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | is_overflows )
2017-04-01 07:20:54 +00:00
mergeWithoutKeyStreamsImpl ( block , result ) ;
# define M(NAME, IS_TWO_LEVEL) \
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
mergeStreamsImpl ( block , key_sizes , result . aggregates_pool , * result . NAME , result . NAME - > data , nullptr , false ) ;
APPLY_FOR_AGGREGATED_VARIANTS ( M )
# undef M
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
}
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
return { } ;
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
Block block ;
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | is_overflows )
block = prepareBlockAndFillWithoutKey ( result , final , is_overflows ) ;
else
block = prepareBlockAndFillSingleLevel ( result , final ) ;
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
if ( ! final )
{
/// Pass ownership of aggregate function states from result to ColumnAggregateFunction objects in the resulting block.
result . aggregator = nullptr ;
2017-04-01 07:20:54 +00:00
}
2017-07-25 12:16:14 +00:00
size_t rows = block . rows ( ) ;
size_t bytes = block . bytes ( ) ;
double elapsed_seconds = watch . elapsedSeconds ( ) ;
LOG_TRACE ( log , std : : fixed < < std : : setprecision ( 3 )
< < " Merged partially aggregated blocks. "
< < rows < < " rows, " < < bytes / 1048576.0 < < " MiB. "
< < " in " < < elapsed_seconds < < " sec. "
< < " ( " < < rows / elapsed_seconds < < " rows/sec., " < < bytes / elapsed_seconds / 1048576.0 < < " MiB/sec.) " ) ;
2017-04-01 07:20:54 +00:00
2017-07-25 12:16:14 +00:00
if ( isCancelled ( ) )
2017-04-01 07:20:54 +00:00
return { } ;
2017-07-25 12:16:14 +00:00
block . info . bucket_num = bucket_num ;
return block ;
2015-07-30 23:41:02 +00:00
}
2015-09-07 07:40:14 +00:00
template < typename Method >
void NO_INLINE Aggregator : : convertBlockToTwoLevelImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Arena * pool ,
ConstColumnPlainPtrs & key_columns ,
const Sizes & key_sizes ,
StringRefs & keys ,
const Block & source ,
std : : vector < Block > & destinations ) const
2015-09-07 07:40:14 +00:00
{
2017-04-01 07:20:54 +00:00
typename Method : : State state ;
state . init ( key_columns ) ;
size_t rows = source . rows ( ) ;
size_t columns = source . columns ( ) ;
/// Create a 'selector' that will contain bucket index for every row. It will be used to scatter rows to buckets.
IColumn : : Selector selector ( rows ) ;
/// For every row.
for ( size_t i = 0 ; i < rows ; + + i )
{
/// Obtain a key. Calculate bucket number from it.
typename Method : : Key key = state . getKey ( key_columns , params . keys_size , i , key_sizes , keys , * pool ) ;
auto hash = method . data . hash ( key ) ;
auto bucket = method . data . getBucketFromHash ( hash ) ;
selector [ i ] = bucket ;
/// We don't need to store this key in pool.
method . onExistingKey ( key , keys , * pool ) ;
}
size_t num_buckets = destinations . size ( ) ;
for ( size_t column_idx = 0 ; column_idx < columns ; + + column_idx )
{
const ColumnWithTypeAndName & src_col = source . getByPosition ( column_idx ) ;
Columns scattered_columns = src_col . column - > scatter ( num_buckets , selector ) ;
for ( size_t bucket = 0 , size = num_buckets ; bucket < size ; + + bucket )
{
if ( ! scattered_columns [ bucket ] - > empty ( ) )
{
Block & dst = destinations [ bucket ] ;
dst . info . bucket_num = bucket ;
dst . insert ( { scattered_columns [ bucket ] , src_col . type , src_col . name } ) ;
}
/** Inserted columns of type ColumnAggregateFunction will own states of aggregate functions
* by holding shared_ptr to source column . See ColumnAggregateFunction . h
*/
}
}
2015-09-07 07:40:14 +00:00
}
std : : vector < Block > Aggregator : : convertBlockToTwoLevel ( const Block & block )
{
2017-04-01 07:20:54 +00:00
if ( ! block )
return { } ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
initialize ( { } ) ;
setSampleBlock ( block ) ;
2015-09-08 20:19:30 +00:00
2017-04-01 07:20:54 +00:00
AggregatedDataVariants data ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
StringRefs key ( params . keys_size ) ;
ConstColumnPlainPtrs key_columns ( params . keys_size ) ;
Sizes key_sizes ;
2015-09-07 07:40:14 +00:00
2017-04-02 17:37:49 +00:00
/// Remember the columns we will work with
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < params . keys_size ; + + i )
key_columns [ i ] = block . safeGetByPosition ( i ) . column . get ( ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
AggregatedDataVariants : : Type type = chooseAggregationMethod ( key_columns , key_sizes ) ;
data . keys_size = params . keys_size ;
data . key_sizes = key_sizes ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( type = = AggregatedDataVariants : : Type : : NAME ) \
type = AggregatedDataVariants : : Type : : NAME # # _two_level ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
data . init ( type ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
size_t num_buckets = 0 ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data . type = = AggregatedDataVariants : : Type : : NAME ) \
num_buckets = data . NAME - > data . NUM_BUCKETS ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
std : : vector < Block > splitted_blocks ( num_buckets ) ;
2015-09-07 07:40:14 +00:00
# define M(NAME) \
2017-04-01 07:20:54 +00:00
else if ( data . type = = AggregatedDataVariants : : Type : : NAME ) \
convertBlockToTwoLevelImpl ( * data . NAME , data . aggregates_pool , \
key_columns , data . key_sizes , key , block , splitted_blocks ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_VARIANTS_TWO_LEVEL ( M )
2015-09-07 07:40:14 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2015-09-07 07:40:14 +00:00
2017-04-01 07:20:54 +00:00
return splitted_blocks ;
2015-09-07 07:40:14 +00:00
}
2015-12-06 19:42:28 +00:00
template < typename Method , typename Table >
2015-01-03 05:39:21 +00:00
void NO_INLINE Aggregator : : destroyImpl (
2017-04-01 07:20:54 +00:00
Method & method ,
Table & table ) const
2015-01-03 05:39:21 +00:00
{
2017-04-01 07:20:54 +00:00
for ( auto elem : table )
{
AggregateDataPtr & data = Method : : getAggregateData ( elem . second ) ;
2017-04-02 17:37:49 +00:00
/** If an exception (usually a lack of memory, the MemoryTracker throws) arose
* after inserting the key into a hash table , but before creating all states of aggregate functions ,
* then data will be equal nullptr .
2017-04-01 07:20:54 +00:00
*/
if ( nullptr = = data )
continue ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
if ( ! aggregate_functions [ i ] - > isState ( ) )
aggregate_functions [ i ] - > destroy ( data + offsets_of_aggregate_states [ i ] ) ;
data = nullptr ;
}
2015-12-06 19:42:28 +00:00
}
void Aggregator : : destroyWithoutKey ( AggregatedDataVariants & result ) const
{
2017-04-01 07:20:54 +00:00
AggregatedDataWithoutKey & res_data = result . without_key ;
2015-12-06 19:42:28 +00:00
2017-04-01 07:20:54 +00:00
if ( nullptr ! = res_data )
{
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
if ( ! aggregate_functions [ i ] - > isState ( ) )
aggregate_functions [ i ] - > destroy ( res_data + offsets_of_aggregate_states [ i ] ) ;
2015-12-06 19:42:28 +00:00
2017-04-01 07:20:54 +00:00
res_data = nullptr ;
}
2012-05-30 01:38:02 +00:00
}
2014-05-19 19:41:56 +00:00
void Aggregator : : destroyAllAggregateStates ( AggregatedDataVariants & result )
2013-02-16 18:59:05 +00:00
{
2017-04-01 07:20:54 +00:00
if ( result . size ( ) = = 0 )
return ;
2013-02-16 18:59:05 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Destroying aggregate states " ) ;
2013-02-16 18:59:05 +00:00
2017-04-02 17:37:49 +00:00
/// In what data structure is the data aggregated?
2017-04-01 07:20:54 +00:00
if ( result . type = = AggregatedDataVariants : : Type : : without_key | | params . overflow_row )
destroyWithoutKey ( result ) ;
2013-02-16 18:59:05 +00:00
2014-12-30 10:16:23 +00:00
# define M(NAME, IS_TWO_LEVEL) \
2017-04-01 07:20:54 +00:00
else if ( result . type = = AggregatedDataVariants : : Type : : NAME ) \
destroyImpl ( * result . NAME , result . NAME - > data ) ;
2014-12-30 10:16:23 +00:00
2017-04-01 07:20:54 +00:00
if ( false ) { }
APPLY_FOR_AGGREGATED_VARIANTS ( M )
2014-12-30 10:16:23 +00:00
# undef M
2017-04-01 07:20:54 +00:00
else if ( result . type ! = AggregatedDataVariants : : Type : : without_key )
throw Exception ( " Unknown aggregated data variant. " , ErrorCodes : : UNKNOWN_AGGREGATED_DATA_VARIANT ) ;
2013-02-16 18:59:05 +00:00
}
2013-05-03 10:20:53 +00:00
String Aggregator : : getID ( ) const
{
2017-04-01 07:20:54 +00:00
std : : stringstream res ;
if ( params . keys . empty ( ) )
{
res < < " key_names " ;
for ( size_t i = 0 ; i < params . key_names . size ( ) ; + + i )
res < < " , " < < params . key_names [ i ] ;
}
else
{
res < < " keys " ;
for ( size_t i = 0 ; i < params . keys . size ( ) ; + + i )
res < < " , " < < params . keys [ i ] ;
}
res < < " , aggregates " ;
for ( size_t i = 0 ; i < params . aggregates_size ; + + i )
res < < " , " < < params . aggregates [ i ] . column_name ;
return res . str ( ) ;
2013-05-03 10:20:53 +00:00
}
2015-04-16 14:27:56 +00:00
void Aggregator : : setCancellationHook ( const CancellationHook cancellation_hook )
{
2017-04-01 07:20:54 +00:00
isCancelled = cancellation_hook ;
2015-04-16 14:27:56 +00:00
}
2015-07-30 23:41:02 +00:00
2011-09-19 01:42:16 +00:00
}