ClickHouse/dbms/include/DB/Interpreters/Aggregator.h

161 lines
6.3 KiB
C
Raw Normal View History

2011-09-19 01:42:16 +00:00
#pragma once
2011-09-26 15:45:31 +00:00
#include <city.h>
2011-09-26 07:25:22 +00:00
#include <map>
#include <tr1/unordered_map>
2012-03-05 07:58:34 +00:00
#include <Poco/Mutex.h>
2011-09-19 01:42:16 +00:00
#include <DB/Core/ColumnNumbers.h>
2011-09-24 20:32:41 +00:00
#include <DB/Core/Names.h>
2011-09-19 01:42:16 +00:00
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
2011-12-12 10:06:17 +00:00
#include <DB/Interpreters/HashMap.h>
2011-09-19 01:42:16 +00:00
namespace DB
{
struct AggregateDescription
{
AggregateFunctionPtr function;
ColumnNumbers arguments;
2011-09-24 20:32:41 +00:00
Names argument_names; /// Используются, если arguments не заданы.
2011-09-25 03:37:09 +00:00
String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции
2011-09-19 01:42:16 +00:00
};
typedef std::vector<AggregateDescription> AggregateDescriptions;
2011-09-26 07:25:22 +00:00
/// Для агрегации по md5.
struct UInt128
{
UInt64 first;
UInt64 second;
bool operator== (const UInt128 rhs) const { return first == rhs.first && second == rhs.second; }
2011-12-12 10:06:17 +00:00
bool operator!= (const UInt128 rhs) const { return first != rhs.first || second != rhs.second; }
2011-09-26 07:25:22 +00:00
};
struct UInt128Hash
{
2011-12-19 07:00:15 +00:00
default_hash<UInt64> hash64;
size_t operator()(UInt128 x) const { return hash64(x.first ^ 0xB15652B8790A0D36ULL) ^ hash64(x.second); }
2011-09-26 07:25:22 +00:00
};
2011-12-12 10:06:17 +00:00
struct UInt128ZeroTraits
{
static inline bool check(UInt128 x) { return x.first == 0 && x.second == 0; }
static inline void set(UInt128 & x) { x.first = 0; x.second = 0; }
};
2011-09-26 07:25:22 +00:00
2011-09-26 15:45:31 +00:00
/// Немного быстрее стандартного
struct StringHash
{
size_t operator()(const String & x) const { return CityHash64(x.data(), x.size()); }
};
2011-12-19 08:06:31 +00:00
/** Разные структуры данных, которые могут использоваться для агрегации
* Для эффективности используются "голые" указатели на IAggregateFunction,
* владение будет захвачено после агрегации (в AggregatingBlockInputStream).
*/
typedef std::map<Row, AggregateFunctionsPlainPtrs> AggregatedData;
typedef AggregateFunctionsPlainPtrs AggregatedDataWithoutKey;
typedef HashMap<UInt64, AggregateFunctionsPlainPtrs> AggregatedDataWithUInt64Key;
typedef std::tr1::unordered_map<String, AggregateFunctionsPlainPtrs, StringHash> AggregatedDataWithStringKey;
typedef HashMap<UInt128, std::pair<Row, AggregateFunctionsPlainPtrs>, UInt128Hash, UInt128ZeroTraits> AggregatedDataHashed;
2011-09-26 07:25:22 +00:00
struct AggregatedDataVariants
{
2011-09-26 15:45:31 +00:00
/// Наиболее общий вариант. Самый медленный. На данный момент, не используется.
2011-09-26 07:25:22 +00:00
AggregatedData generic;
2011-09-26 15:45:31 +00:00
/// Специализация для случая, когда ключи отсутствуют.
2011-09-26 07:25:22 +00:00
AggregatedDataWithoutKey without_key;
2011-09-26 15:45:31 +00:00
/// Специализация для случая, когда есть один числовой ключ (не с плавающей запятой).
2011-09-26 07:25:22 +00:00
AggregatedDataWithUInt64Key key64;
2011-09-26 15:45:31 +00:00
/// Специализация для случая, когда есть один строковый ключ.
2011-09-26 15:22:25 +00:00
AggregatedDataWithStringKey key_string;
2011-09-26 15:45:31 +00:00
/** Агрегирует по 128 битному хэшу от ключа.
* Если все ключи фиксированной длины, влезающие целиком в 128 бит, то укладывает их без изменений в 128 бит.
* Иначе - вычисляет md5 от набора из всех ключей.
2011-09-28 05:24:38 +00:00
* (При этом, строки, содержащие нули посередине, могут склеиться.)
2011-09-26 15:45:31 +00:00
*/
2011-09-26 07:25:22 +00:00
AggregatedDataHashed hashed;
2012-02-27 06:28:20 +00:00
enum Type
{
2012-05-10 07:47:13 +00:00
EMPTY = 0,
GENERIC = 1,
WITHOUT_KEY = 2,
KEY_64 = 3,
KEY_STRING = 4,
HASHED = 5,
2012-02-27 06:28:20 +00:00
};
Type type;
2012-05-10 07:47:13 +00:00
AggregatedDataVariants() : type(EMPTY) {}
bool empty() { return type == EMPTY; }
2011-09-26 07:25:22 +00:00
};
2012-02-27 06:28:20 +00:00
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
2011-09-19 01:42:16 +00:00
2012-03-05 07:58:34 +00:00
/** Агрегирует источник блоков.
2011-09-19 01:42:16 +00:00
*/
2011-09-19 03:34:23 +00:00
class Aggregator
2011-09-19 01:42:16 +00:00
{
public:
2012-03-05 07:58:34 +00:00
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_) : keys(keys_), aggregates(aggregates_), initialized(false) {};
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_) : key_names(key_names_), aggregates(aggregates_), initialized(false) {};
2011-09-19 01:42:16 +00:00
2012-03-05 07:58:34 +00:00
/// Агрегировать источник. Получить результат в виде одной из структур данных.
2011-09-26 07:25:22 +00:00
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
2011-09-19 01:42:16 +00:00
2011-09-19 03:34:23 +00:00
/// Получить пример блока, описывающего результат. Следует вызывать только после execute.
Block getSampleBlock() { return sample; }
2012-02-27 06:28:20 +00:00
/// Преобразовать структуру данных агрегации в блок.
Block convertToBlock(AggregatedDataVariants & data_variants);
2012-05-30 01:38:02 +00:00
/// Объединить несколько структур данных агрегации в одну. (В первый элемент массива.) Все варианты агрегации должны быть одинаковыми!
2012-02-27 06:28:20 +00:00
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants);
2012-05-30 01:38:02 +00:00
/** Объединить несколько агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций.)
*/
void merge(BlockInputStreamPtr stream, AggregatedDataVariants & result);
2011-09-19 01:42:16 +00:00
private:
ColumnNumbers keys;
2011-09-24 20:32:41 +00:00
Names key_names;
2011-09-19 01:42:16 +00:00
AggregateDescriptions aggregates;
2011-09-19 03:34:23 +00:00
2012-03-05 07:58:34 +00:00
/// Для инициализации от первого блока при конкуррентном использовании.
bool initialized;
Poco::FastMutex mutex;
2011-09-19 03:34:23 +00:00
Block sample;
2012-03-05 07:58:34 +00:00
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/
void initialize(Block & block);
2012-05-30 01:38:02 +00:00
/** Выбрать способ агрегации на основе количества и типов ключей. */
typedef std::vector<size_t> Sizes;
AggregatedDataVariants::Type chooseAggregationMethod(Columns & key_columns, bool & keys_fit_128_bits, Sizes & key_sizes);
2011-09-19 01:42:16 +00:00
};
}