2011-09-19 01:42:16 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
#include <map>
|
|
|
|
|
#include <tr1/unordered_map>
|
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
#include <Poco/Mutex.h>
|
|
|
|
|
|
2012-09-05 19:51:09 +00:00
|
|
|
|
#include <Yandex/logger_useful.h>
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/Core/ColumnNumbers.h>
|
2011-09-24 20:32:41 +00:00
|
|
|
|
#include <DB/Core/Names.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
#include <DB/Core/StringRef.h>
|
2013-02-08 19:34:44 +00:00
|
|
|
|
#include <DB/Common/Arena.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/DataStreams/IBlockInputStream.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/AggregateFunctions/IAggregateFunction.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2012-08-23 23:49:28 +00:00
|
|
|
|
#include <DB/Interpreters/AggregationCommon.h>
|
2012-12-25 19:28:59 +00:00
|
|
|
|
#include <DB/Interpreters/Limits.h>
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct AggregateDescription
|
|
|
|
|
{
|
|
|
|
|
AggregateFunctionPtr function;
|
|
|
|
|
ColumnNumbers arguments;
|
2011-09-24 20:32:41 +00:00
|
|
|
|
Names argument_names; /// Используются, если arguments не заданы.
|
2011-09-25 03:37:09 +00:00
|
|
|
|
String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции
|
2011-09-19 01:42:16 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef std::vector<AggregateDescription> AggregateDescriptions;
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2011-12-19 08:06:31 +00:00
|
|
|
|
/** Разные структуры данных, которые могут использоваться для агрегации
|
2013-02-08 19:34:44 +00:00
|
|
|
|
* Для эффективности сами данные для агрегации кладутся в пул.
|
|
|
|
|
* Владение данными (состояний агрегатных функций) и пулом
|
|
|
|
|
* захватывается позднее - в функции ConvertToBlock, объектом ColumnAggregateFunction.
|
2011-12-19 08:06:31 +00:00
|
|
|
|
*/
|
2013-02-08 19:34:44 +00:00
|
|
|
|
typedef std::map<Row, AggregateDataPtr> AggregatedData;
|
|
|
|
|
typedef AggregateDataPtr AggregatedDataWithoutKey;
|
|
|
|
|
typedef HashMap<UInt64, AggregateDataPtr> AggregatedDataWithUInt64Key;
|
|
|
|
|
typedef HashMap<StringRef, AggregateDataPtr, StringRefHash, StringRefZeroTraits> AggregatedDataWithStringKey;
|
2013-02-09 02:20:26 +00:00
|
|
|
|
typedef HashMap<UInt128, std::pair<Field*, AggregateDataPtr>, UInt128Hash, UInt128ZeroTraits> AggregatedDataHashed;
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
class Aggregator;
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
|
|
|
|
struct AggregatedDataVariants
|
|
|
|
|
{
|
2013-02-16 18:59:05 +00:00
|
|
|
|
/** Работа с состояниями агрегатных функций в пуле устроена следующим (неудобным) образом:
|
|
|
|
|
* - при агрегации, состояния создаются в пуле с помощью функции IAggregateFunction::create (внутри - placement new произвольной структуры);
|
|
|
|
|
* - они должны быть затем уничтожены с помощью IAggregateFunction::destroy (внутри - вызов деструктора произвольной структуры);
|
|
|
|
|
* - если агрегация завершена, то, в функции Aggregator::convertToBlock, указатели на состояния агрегатных функций
|
|
|
|
|
* записываются в ColumnAggregateFunction; ColumnAggregateFunction "захватывает владение" ими, то есть - вызывает destroy в своём деструкторе.
|
|
|
|
|
* - если при агрегации, до вызова Aggregator::convertToBlock вылетело исключение, то состояния агрегатных функций всё-равно должны быть уничтожены,
|
|
|
|
|
* иначе для сложных состояний (наприемер, AggregateFunctionUniq), будут утечки памяти;
|
|
|
|
|
* - чтобы, в этом случае, уничтожить состояния, в деструкторе вызывается метод Aggregator::destroyAggregateStates,
|
|
|
|
|
* но только если переменная aggregator (см. ниже) не NULL;
|
|
|
|
|
* - то есть, пока вы не передали владение состояниями агрегатных функций в ColumnAggregateFunction, установите переменную aggregator,
|
|
|
|
|
* чтобы при возникновении исключения, состояния были корректно уничтожены.
|
|
|
|
|
*
|
|
|
|
|
* PS. Это можно исправить, сделав пул, который знает о том, какие состояния агрегатных функций и в каком порядке в него уложены, и умеет сам их уничтожать.
|
|
|
|
|
* Но это вряд ли можно просто сделать, так как в этот же пул планируется класть строки переменной длины.
|
|
|
|
|
* В этом случае, пул не сможет знать, по каким смещениям хранятся объекты.
|
|
|
|
|
*/
|
|
|
|
|
Aggregator * aggregator;
|
|
|
|
|
|
2013-02-09 00:12:04 +00:00
|
|
|
|
/// Пулы для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction.
|
|
|
|
|
Arenas aggregates_pools;
|
2013-02-16 18:59:05 +00:00
|
|
|
|
Arena * aggregates_pool; /// Пул, который сейчас используется для аллокации.
|
2013-02-08 20:34:30 +00:00
|
|
|
|
|
2011-09-26 15:45:31 +00:00
|
|
|
|
/// Наиболее общий вариант. Самый медленный. На данный момент, не используется.
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedData generic;
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
|
|
|
|
/// Специализация для случая, когда ключи отсутствуют.
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedDataWithoutKey without_key;
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
2013-02-16 20:15:45 +00:00
|
|
|
|
/// Специализация для случая, когда есть один числовой ключ.
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedDataWithUInt64Key key64;
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
|
|
|
|
/// Специализация для случая, когда есть один строковый ключ.
|
2011-09-26 15:22:25 +00:00
|
|
|
|
AggregatedDataWithStringKey key_string;
|
2013-02-08 19:34:44 +00:00
|
|
|
|
Arena string_pool;
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
|
|
|
|
/** Агрегирует по 128 битному хэшу от ключа.
|
|
|
|
|
* Если все ключи фиксированной длины, влезающие целиком в 128 бит, то укладывает их без изменений в 128 бит.
|
2012-10-07 06:30:10 +00:00
|
|
|
|
* Иначе - вычисляет SipHash от набора из всех ключей.
|
2011-09-28 05:24:38 +00:00
|
|
|
|
* (При этом, строки, содержащие нули посередине, могут склеиться.)
|
2011-09-26 15:45:31 +00:00
|
|
|
|
*/
|
2011-09-26 07:25:22 +00:00
|
|
|
|
AggregatedDataHashed hashed;
|
2013-02-09 02:20:26 +00:00
|
|
|
|
Arena keys_pool; // TODO: складывать ключи в пул не в виде Field, а в виде плоского набора байт.
|
|
|
|
|
size_t keys_size;
|
2012-02-27 06:28:20 +00:00
|
|
|
|
|
|
|
|
|
enum Type
|
|
|
|
|
{
|
2012-05-10 07:47:13 +00:00
|
|
|
|
EMPTY = 0,
|
|
|
|
|
GENERIC = 1,
|
|
|
|
|
WITHOUT_KEY = 2,
|
|
|
|
|
KEY_64 = 3,
|
|
|
|
|
KEY_STRING = 4,
|
|
|
|
|
HASHED = 5,
|
2012-02-27 06:28:20 +00:00
|
|
|
|
};
|
|
|
|
|
Type type;
|
2012-05-10 07:47:13 +00:00
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
AggregatedDataVariants() : aggregator(NULL), aggregates_pools(1, new Arena), aggregates_pool(&*aggregates_pools.back()), without_key(NULL), type(EMPTY) {}
|
2012-12-25 19:28:59 +00:00
|
|
|
|
bool empty() const { return type == EMPTY; }
|
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
~AggregatedDataVariants();
|
2013-02-09 02:20:26 +00:00
|
|
|
|
|
2012-12-25 19:28:59 +00:00
|
|
|
|
size_t size() const
|
|
|
|
|
{
|
|
|
|
|
switch (type)
|
|
|
|
|
{
|
|
|
|
|
case EMPTY: return 0;
|
|
|
|
|
case GENERIC: return generic.size();
|
|
|
|
|
case WITHOUT_KEY: return 1;
|
|
|
|
|
case KEY_64: return key64.size();
|
|
|
|
|
case KEY_STRING: return key_string.size();
|
|
|
|
|
case HASHED: return hashed.size();
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-02-09 01:02:52 +00:00
|
|
|
|
|
|
|
|
|
const char * getMethodName() const
|
|
|
|
|
{
|
|
|
|
|
switch (type)
|
|
|
|
|
{
|
|
|
|
|
case EMPTY: return "EMPTY";
|
|
|
|
|
case GENERIC: return "GENERIC";
|
|
|
|
|
case WITHOUT_KEY: return "WITHOUT_KEY";
|
|
|
|
|
case KEY_64: return "KEY_64";
|
|
|
|
|
case KEY_STRING: return "KEY_STRING";
|
|
|
|
|
case HASHED: return "HASHED";
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-09-26 07:25:22 +00:00
|
|
|
|
};
|
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
|
|
|
|
|
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/** Агрегирует источник блоков.
|
2011-09-19 01:42:16 +00:00
|
|
|
|
*/
|
2011-09-19 03:34:23 +00:00
|
|
|
|
class Aggregator
|
2011-09-19 01:42:16 +00:00
|
|
|
|
{
|
|
|
|
|
public:
|
2012-12-25 19:28:59 +00:00
|
|
|
|
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_,
|
|
|
|
|
size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
|
2013-02-08 20:34:30 +00:00
|
|
|
|
: keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
|
|
|
|
total_size_of_aggregate_states(0), initialized(false),
|
2012-12-25 19:28:59 +00:00
|
|
|
|
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
|
|
|
|
log(&Logger::get("Aggregator"))
|
2012-09-12 18:49:21 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
2012-05-31 00:33:42 +00:00
|
|
|
|
|
2012-12-25 19:28:59 +00:00
|
|
|
|
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_,
|
|
|
|
|
size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
|
2013-02-08 20:34:30 +00:00
|
|
|
|
: key_names(key_names_), aggregates(aggregates_), keys_size(key_names.size()), aggregates_size(aggregates.size()),
|
|
|
|
|
total_size_of_aggregate_states(0), initialized(false),
|
2012-12-25 19:28:59 +00:00
|
|
|
|
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
|
|
|
|
log(&Logger::get("Aggregator"))
|
2012-09-12 18:49:21 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/// Агрегировать источник. Получить результат в виде одной из структур данных.
|
2011-09-26 07:25:22 +00:00
|
|
|
|
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
/// Получить пример блока, описывающего результат. Следует вызывать только после execute.
|
|
|
|
|
Block getSampleBlock() { return sample; }
|
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
/// Преобразовать структуру данных агрегации в блок.
|
|
|
|
|
Block convertToBlock(AggregatedDataVariants & data_variants);
|
|
|
|
|
|
2013-02-09 02:20:26 +00:00
|
|
|
|
/** Объединить несколько структур данных агрегации в одну. (В первый элемент массива.) Все варианты агрегации должны быть одинаковыми!
|
|
|
|
|
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить, пока не будет вызвана функция convertToBlock.
|
|
|
|
|
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
|
|
|
|
|
*/
|
2012-02-27 06:28:20 +00:00
|
|
|
|
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants);
|
|
|
|
|
|
2012-05-30 01:38:02 +00:00
|
|
|
|
/** Объединить несколько агрегированных блоков в одну структуру данных.
|
|
|
|
|
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций.)
|
|
|
|
|
*/
|
|
|
|
|
void merge(BlockInputStreamPtr stream, AggregatedDataVariants & result);
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
private:
|
2013-02-16 18:59:05 +00:00
|
|
|
|
friend struct AggregatedDataVariants;
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
ColumnNumbers keys;
|
2011-09-24 20:32:41 +00:00
|
|
|
|
Names key_names;
|
2011-09-19 01:42:16 +00:00
|
|
|
|
AggregateDescriptions aggregates;
|
2013-02-08 20:34:30 +00:00
|
|
|
|
std::vector<IAggregateFunction *> aggregate_functions;
|
2012-09-12 18:49:21 +00:00
|
|
|
|
size_t keys_size;
|
2013-02-03 18:08:52 +00:00
|
|
|
|
size_t aggregates_size;
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
2013-02-08 20:34:30 +00:00
|
|
|
|
Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций.
|
|
|
|
|
size_t total_size_of_aggregate_states; /// Суммарный размер строки из агрегатных функций.
|
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/// Для инициализации от первого блока при конкуррентном использовании.
|
|
|
|
|
bool initialized;
|
|
|
|
|
Poco::FastMutex mutex;
|
|
|
|
|
|
2012-12-25 19:28:59 +00:00
|
|
|
|
size_t max_rows_to_group_by;
|
|
|
|
|
Limits::OverflowMode group_by_overflow_mode;
|
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
Block sample;
|
2012-03-05 07:58:34 +00:00
|
|
|
|
|
2012-05-31 00:33:42 +00:00
|
|
|
|
Logger * log;
|
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
|
|
|
|
|
* Сформировать блок - пример результата.
|
|
|
|
|
*/
|
|
|
|
|
void initialize(Block & block);
|
2012-05-30 01:38:02 +00:00
|
|
|
|
|
|
|
|
|
/** Выбрать способ агрегации на основе количества и типов ключей. */
|
2013-01-08 19:41:22 +00:00
|
|
|
|
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, bool & keys_fit_128_bits, Sizes & key_sizes);
|
2013-02-16 18:59:05 +00:00
|
|
|
|
|
|
|
|
|
/** Вызвать методы destroy для состояний агрегатных функций.
|
|
|
|
|
* Используется в обработчике исключений при агрегации, так как RAII в данном случае не применим.
|
|
|
|
|
*/
|
|
|
|
|
void destroyAggregateStates(AggregatedDataVariants & result);
|
2011-09-19 01:42:16 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|