Merge pull request #3886 from yandex/low-cardinality-specialized-aggregation

Added LowCardinality support for specialized aggregation.
This commit is contained in:
alexey-milovidov 2018-12-20 20:46:49 +03:00 committed by GitHub
commit 4993cb53d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 19 deletions

View File

@ -615,7 +615,6 @@ void NO_INLINE Aggregator::executeImplCase(
AggregateDataPtr overflow_row) const
{
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
/// TODO for low cardinality optimization.
/// For all rows.
typename Method::Key prev_key;

View File

@ -108,6 +108,9 @@ void NO_INLINE Aggregator::executeSpecialized(
AggregateDataPtr overflow_row) const
{
typename Method::State state;
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
if (!no_more_keys)
@ -133,15 +136,19 @@ void NO_INLINE Aggregator::executeSpecializedCase(
AggregateDataPtr overflow_row) const
{
/// For all rows.
typename Method::iterator it;
typename Method::Key prev_key;
AggregateDataPtr value = nullptr;
for (size_t i = 0; i < rows; ++i)
{
bool inserted; /// Inserted a new key, or was this key already?
bool overflow = false; /// New key did not fit in the hash table because of no_more_keys.
bool inserted = false; /// Inserted a new key, or was this key already?
/// Get the key to insert into the hash table.
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
AggregateDataPtr * aggregate_data = nullptr;
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
if (!no_more_keys) /// Insert.
{
@ -150,8 +157,6 @@ void NO_INLINE Aggregator::executeSpecializedCase(
{
if (i != 0 && key == prev_key)
{
AggregateDataPtr value = Method::getAggregateData(it->second);
/// Add values into aggregate functions.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i, aggregates_pool));
@ -163,19 +168,29 @@ void NO_INLINE Aggregator::executeSpecializedCase(
prev_key = key;
}
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
else
{
method.data.emplace(key, it, inserted);
aggregate_data = &Method::getAggregateData(it->second);
}
}
else
{
/// Add only if the key already exists.
inserted = false;
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.findFromRow(method.data, i);
else
{
it = method.data.find(key);
if (method.data.end() == it)
overflow = true;
if (method.data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
}
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
if (no_more_keys && overflow && !overflow_row)
if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue;
@ -184,9 +199,9 @@ void NO_INLINE Aggregator::executeSpecializedCase(
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly some stuff related to the key.
if (inserted)
{
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = nullptr;
*aggregate_data = nullptr;
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
@ -194,12 +209,15 @@ void NO_INLINE Aggregator::executeSpecializedCase(
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, place));
aggregate_data = place;
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
value = aggregate_data ? *aggregate_data : overflow_row;
/// Add values into the aggregate functions.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(