ClickHouse/dbms/include/DB/Interpreters/SpecializedAggregator.h
2015-01-13 06:03:45 +03:00

292 lines
9.9 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <DB/Interpreters/Aggregator.h>
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
#include <DB/AggregateFunctions/AggregateFunctionSum.h>
#include <DB/AggregateFunctions/AggregateFunctionAvg.h>
#include <DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h>
#include <DB/AggregateFunctions/AggregateFunctionsArgMinMax.h>
#include <DB/AggregateFunctions/AggregateFunctionUniq.h>
#include <DB/AggregateFunctions/AggregateFunctionUniqUpTo.h>
#include <DB/AggregateFunctions/AggregateFunctionGroupArray.h>
#include <DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h>
#include <DB/AggregateFunctions/AggregateFunctionQuantile.h>
#include <DB/AggregateFunctions/AggregateFunctionQuantileTiming.h>
#include <DB/AggregateFunctions/AggregateFunctionIf.h>
#include <DB/AggregateFunctions/AggregateFunctionArray.h>
#include <DB/AggregateFunctions/AggregateFunctionState.h>
#include <DB/AggregateFunctions/AggregateFunctionMerge.h>
namespace DB
{
/** Шаблон цикла агрегации, позволяющий сгенерировать специализированный вариант для конкретной комбинации агрегатных функций.
* Отличается от обычного тем, что вызовы агрегатных функций должны инлайниться, а цикл обновления агрегатных функций должен развернуться.
*
* Так как возможных комбинаций слишком много, то не представляется возможным сгенерировать их все заранее.
* Этот шаблон предназначен для того, чтобы инстанцировать его в рантайме,
* путём запуска компилятора, компиляции shared library и использования её с помощью dlopen.
*/
/** Список типов - для удобного перечисления агрегатных функций.
*/
template <typename THead, typename... TTail>
struct TypeList
{
using Head = THead;
using Tail = TypeList<TTail...>;
static constexpr size_t size = 1 + sizeof...(TTail);
template <size_t I>
using At = typename std::template conditional<I == 0, Head, typename Tail::template At<I - 1>>::type;
template <typename Func, size_t index = 0>
static void ALWAYS_INLINE forEach(Func && func)
{
func.template operator()<Head, index>();
Tail::template forEach<Func, index + 1>(std::forward<Func>(func));
}
};
template <typename THead>
struct TypeList<THead>
{
using Head = THead;
static constexpr size_t size = 1;
template <size_t I>
using At = typename std::template conditional<I == 0, Head, std::nullptr_t>::type;
template <typename Func, size_t index = 0>
static void ALWAYS_INLINE forEach(Func && func)
{
func.template operator()<Head, index>();
}
};
struct EmptyTypeList
{
static constexpr size_t size = 0;
template <size_t I>
using At = std::nullptr_t;
template <typename Func, size_t index = 0>
static void forEach(Func && func)
{
}
};
struct AggregateFunctionsUpdater
{
AggregateFunctionsUpdater(
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions_,
const Sizes & offsets_of_aggregate_states_,
Aggregator::AggregateColumns & aggregate_columns_,
AggregateDataPtr & value_,
size_t row_num_)
: aggregate_functions(aggregate_functions_),
offsets_of_aggregate_states(offsets_of_aggregate_states_),
aggregate_columns(aggregate_columns_),
value(value_), row_num(row_num_)
{
}
template <typename AggregateFunction, size_t column_num>
void operator()() ALWAYS_INLINE;
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions;
const Sizes & offsets_of_aggregate_states;
Aggregator::AggregateColumns & aggregate_columns;
AggregateDataPtr & value;
size_t row_num;
};
template <typename AggregateFunction, size_t column_num>
void AggregateFunctionsUpdater::operator()()
{
static_cast<AggregateFunction *>(aggregate_functions[column_num])->add(
value + offsets_of_aggregate_states[column_num],
&aggregate_columns[column_num][0],
row_num);
}
struct AggregateFunctionsCreator
{
AggregateFunctionsCreator(
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions_,
const Sizes & offsets_of_aggregate_states_,
Aggregator::AggregateColumns & aggregate_columns_,
AggregateDataPtr & aggregate_data_)
: aggregate_functions(aggregate_functions_),
offsets_of_aggregate_states(offsets_of_aggregate_states_),
aggregate_data(aggregate_data_)
{
}
template <typename AggregateFunction, size_t column_num>
void operator()() ALWAYS_INLINE;
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions;
const Sizes & offsets_of_aggregate_states;
AggregateDataPtr & aggregate_data;
};
template <typename AggregateFunction, size_t column_num>
void AggregateFunctionsCreator::operator()()
{
AggregateFunction * func = static_cast<AggregateFunction *>(aggregate_functions[column_num]);
try
{
/** Может возникнуть исключение при нехватке памяти.
* Для того, чтобы потом всё правильно уничтожилось, "откатываем" часть созданных состояний.
* Код не очень удобный.
*/
func->create(aggregate_data + offsets_of_aggregate_states[column_num]);
}
catch (...)
{
for (size_t rollback_j = 0; rollback_j < column_num; ++rollback_j)
func->destroy(aggregate_data + offsets_of_aggregate_states[rollback_j]);
aggregate_data = nullptr;
throw;
}
}
template <typename Method, typename AggregateFunctionsList>
void NO_INLINE Aggregator::executeSpecialized(
Method & method,
Arena * aggregates_pool,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumns & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
typename Method::State state;
state.init(key_columns);
if (!no_more_keys)
executeSpecializedCase<false, Method, AggregateFunctionsList>(
method, state, aggregates_pool, rows, key_columns, aggregate_columns, key_sizes, keys, overflow_row);
else
executeSpecializedCase<true, Method, AggregateFunctionsList>(
method, state, aggregates_pool, rows, key_columns, aggregate_columns, key_sizes, keys, overflow_row);
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wuninitialized"
template <bool no_more_keys, typename Method, typename AggregateFunctionsList>
void NO_INLINE Aggregator::executeSpecializedCase(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumns & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys,
AggregateDataPtr overflow_row) const
{
/// Для всех строчек.
typename Method::iterator it;
typename Method::Key prev_key;
for (size_t i = 0; i < rows; ++i)
{
bool inserted; /// Вставили новый ключ, или такой ключ уже был?
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys);
if (!no_more_keys) /// Вставляем.
{
/// Оптимизация для часто повторяющихся ключей.
if (i != 0 && key == prev_key)
{
AggregateDataPtr value = Method::getAggregateData(it->second);
/// Добавляем значения в агрегатные функции.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i));
continue;
}
else
prev_key = key;
method.data.emplace(key, it, inserted);
}
else
{
/// Будем добавлять только если ключ уже есть.
inserted = false;
it = method.data.find(key);
if (method.data.end() == it)
overflow = true;
}
/// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего.
if (no_more_keys && overflow && !overflow_row)
continue;
/// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом.
if (inserted)
{
method.onNewKey(*it, keys_size, i, keys, *aggregates_pool);
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states);
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, aggregate_data));
}
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
/// Добавляем значения в агрегатные функции.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i));
}
}
#pragma GCC diagnostic pop
template <typename AggregateFunctionsList>
void NO_INLINE Aggregator::executeSpecializedWithoutKey(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateColumns & aggregate_columns) const
{
/// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = aggregates_size == 1
? typeid_cast<AggregateFunctionCount *>(aggregate_functions[0])
: NULL;
if (agg_count)
agg_count->addDelta(res, rows);
else
{
for (size_t i = 0; i < rows; ++i)
{
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i));
}
}
}
}