2011-09-19 01:42:16 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
#include <map>
|
2014-01-08 16:33:28 +00:00
|
|
|
|
#include <unordered_map>
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
#include <Poco/Mutex.h>
|
|
|
|
|
|
2012-09-05 19:51:09 +00:00
|
|
|
|
#include <Yandex/logger_useful.h>
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/Core/ColumnNumbers.h>
|
2011-09-24 20:32:41 +00:00
|
|
|
|
#include <DB/Core/Names.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
#include <DB/Core/StringRef.h>
|
2013-02-08 19:34:44 +00:00
|
|
|
|
#include <DB/Common/Arena.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/DataStreams/IBlockInputStream.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
#include <DB/AggregateFunctions/IAggregateFunction.h>
|
2012-05-31 05:41:56 +00:00
|
|
|
|
|
2012-08-23 23:49:28 +00:00
|
|
|
|
#include <DB/Interpreters/AggregationCommon.h>
|
2012-12-25 19:28:59 +00:00
|
|
|
|
#include <DB/Interpreters/Limits.h>
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
#include <DB/Columns/ColumnString.h>
|
|
|
|
|
#include <DB/Columns/ColumnFixedString.h>
|
|
|
|
|
#include <DB/Columns/ColumnAggregateFunction.h>
|
2014-10-29 01:18:50 +00:00
|
|
|
|
#include <DB/Columns/ColumnVector.h>
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct AggregateDescription
|
|
|
|
|
{
|
|
|
|
|
AggregateFunctionPtr function;
|
2014-03-25 18:16:26 +00:00
|
|
|
|
Array parameters; /// Параметры (параметрической) агрегатной функции.
|
2011-09-19 01:42:16 +00:00
|
|
|
|
ColumnNumbers arguments;
|
2011-09-24 20:32:41 +00:00
|
|
|
|
Names argument_names; /// Используются, если arguments не заданы.
|
2011-09-25 03:37:09 +00:00
|
|
|
|
String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции
|
2011-09-19 01:42:16 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef std::vector<AggregateDescription> AggregateDescriptions;
|
|
|
|
|
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2011-12-19 08:06:31 +00:00
|
|
|
|
/** Разные структуры данных, которые могут использоваться для агрегации
|
2013-02-08 19:34:44 +00:00
|
|
|
|
* Для эффективности сами данные для агрегации кладутся в пул.
|
|
|
|
|
* Владение данными (состояний агрегатных функций) и пулом
|
2014-02-09 21:22:54 +00:00
|
|
|
|
* захватывается позднее - в функции convertToBlock, объектом ColumnAggregateFunction.
|
2011-12-19 08:06:31 +00:00
|
|
|
|
*/
|
2013-02-08 19:34:44 +00:00
|
|
|
|
typedef AggregateDataPtr AggregatedDataWithoutKey;
|
|
|
|
|
typedef HashMap<UInt64, AggregateDataPtr> AggregatedDataWithUInt64Key;
|
2014-05-10 02:42:45 +00:00
|
|
|
|
typedef HashMapWithSavedHash<StringRef, AggregateDataPtr> AggregatedDataWithStringKey;
|
2014-04-28 01:48:24 +00:00
|
|
|
|
typedef HashMap<UInt128, AggregateDataPtr, UInt128Hash> AggregatedDataWithKeys128;
|
|
|
|
|
typedef HashMap<UInt128, std::pair<StringRef*, AggregateDataPtr>, UInt128TrivialHash> AggregatedDataHashed;
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
|
2014-10-29 01:18:50 +00:00
|
|
|
|
/// Специализации для UInt8, UInt16.
|
|
|
|
|
struct TrivialHash
|
|
|
|
|
{
|
|
|
|
|
template <typename T>
|
|
|
|
|
size_t operator() (T key) const
|
|
|
|
|
{
|
|
|
|
|
return key;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи.
|
|
|
|
|
template <size_t key_bits>
|
|
|
|
|
struct HashTableFixedGrower
|
|
|
|
|
{
|
|
|
|
|
size_t bufSize() const { return 1 << key_bits; }
|
|
|
|
|
size_t place(size_t x) const { return x; }
|
2014-10-29 02:35:16 +00:00
|
|
|
|
/// Тут можно было бы написать __builtin_unreachable(), но компилятор не до конца всё оптимизирует, и получается менее эффективно.
|
|
|
|
|
size_t next(size_t pos) const { return pos + 1; }
|
2014-10-29 01:18:50 +00:00
|
|
|
|
bool overflow(size_t elems) const { return false; }
|
|
|
|
|
|
|
|
|
|
void increaseSize() { __builtin_unreachable(); }
|
|
|
|
|
void set(size_t num_elems) {}
|
|
|
|
|
void setBufSize(size_t buf_size_) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<8>> AggregatedDataWithUInt8Key;
|
|
|
|
|
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<16>> AggregatedDataWithUInt16Key;
|
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
template <typename FieldType>
|
|
|
|
|
struct AggregatedDataWithUIntKey
|
|
|
|
|
{
|
|
|
|
|
using Type = AggregatedDataWithUInt64Key;
|
|
|
|
|
static constexpr bool never_overflows = false;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
template <>
|
|
|
|
|
struct AggregatedDataWithUIntKey<UInt8>
|
|
|
|
|
{
|
|
|
|
|
using Type = AggregatedDataWithUInt8Key;
|
|
|
|
|
static constexpr bool never_overflows = true; /// Говорит о том, что в результате агрегации не может быть много записей.
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
template <>
|
|
|
|
|
struct AggregatedDataWithUIntKey<UInt16>
|
|
|
|
|
{
|
|
|
|
|
using Type = AggregatedDataWithUInt16Key;
|
|
|
|
|
static constexpr bool never_overflows = true;
|
|
|
|
|
};
|
2014-10-29 01:18:50 +00:00
|
|
|
|
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
/// Для случая, когда есть один числовой ключ.
|
2014-10-29 01:18:50 +00:00
|
|
|
|
template <typename FieldType> /// UInt8/16/32/64 для любых типов соответствующей битности.
|
|
|
|
|
struct AggregationMethodOneNumber
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
2014-10-29 01:18:50 +00:00
|
|
|
|
typedef typename AggregatedDataWithUIntKey<FieldType>::Type Data;
|
|
|
|
|
typedef typename Data::key_type Key;
|
|
|
|
|
typedef typename Data::mapped_type Mapped;
|
|
|
|
|
typedef typename Data::iterator iterator;
|
|
|
|
|
typedef typename Data::const_iterator const_iterator;
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
static constexpr bool never_overflows = AggregatedDataWithUIntKey<FieldType>::never_overflows;
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
Data data;
|
|
|
|
|
|
2014-10-31 23:20:18 +00:00
|
|
|
|
const FieldType * column;
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
/** Вызывается в начале обработки каждого блока.
|
|
|
|
|
* Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах.
|
|
|
|
|
*/
|
|
|
|
|
void init(ConstColumnPlainPtrs & key_columns)
|
|
|
|
|
{
|
2014-10-31 23:20:18 +00:00
|
|
|
|
column = &static_cast<const ColumnVector<FieldType> *>(key_columns[0])->getData()[0];
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Достать из ключевых столбцов ключ для вставки в хэш-таблицу.
|
|
|
|
|
Key getKey(
|
|
|
|
|
const ConstColumnPlainPtrs & key_columns, /// Ключевые столбцы.
|
|
|
|
|
size_t keys_size, /// Количество ключевых столбцов.
|
|
|
|
|
size_t i, /// Из какой строки блока достать ключ.
|
|
|
|
|
const Sizes & key_sizes, /// Если ключи фиксированной длины - их длины. Не используется в методах агрегации по ключам переменной длины.
|
|
|
|
|
StringRefs & keys) const /// Сюда могут быть записаны ссылки на данные ключей в столбцах. Они могут быть использованы в дальнейшем.
|
|
|
|
|
{
|
2014-10-31 23:20:18 +00:00
|
|
|
|
return get64(column[i]);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Из значения в хэш-таблице получить AggregateDataPtr.
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
|
|
|
|
|
/** Разместить дополнительные данные, если это необходимо, в случае, когда в хэш-таблицу был вставлен новый ключ.
|
|
|
|
|
*/
|
2014-08-24 07:17:50 +00:00
|
|
|
|
static void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Вставить ключ из хэш-таблицы в столбцы.
|
|
|
|
|
*/
|
|
|
|
|
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
|
|
|
|
{
|
2014-10-29 01:18:50 +00:00
|
|
|
|
static_cast<ColumnVector<FieldType> *>(key_columns[0])->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
2014-10-31 23:20:18 +00:00
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
UInt64 get64(FieldType x) const
|
|
|
|
|
{
|
|
|
|
|
return x;
|
|
|
|
|
}
|
2014-05-10 00:31:22 +00:00
|
|
|
|
};
|
|
|
|
|
|
2014-10-31 23:20:18 +00:00
|
|
|
|
template <>
|
|
|
|
|
inline UInt64 AggregationMethodOneNumber<Float64>::get64(Float64 x) const
|
|
|
|
|
{
|
|
|
|
|
union
|
|
|
|
|
{
|
|
|
|
|
Float64 src;
|
|
|
|
|
UInt64 res;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
src = x;
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template <>
|
|
|
|
|
inline UInt64 AggregationMethodOneNumber<Float32>::get64(Float32 x) const
|
|
|
|
|
{
|
|
|
|
|
union
|
|
|
|
|
{
|
|
|
|
|
Float32 src;
|
|
|
|
|
UInt64 res;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
res = 0;
|
|
|
|
|
src = x;
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
/// Для случая, когда есть один строковый ключ.
|
|
|
|
|
struct AggregationMethodString
|
|
|
|
|
{
|
|
|
|
|
typedef AggregatedDataWithStringKey Data;
|
|
|
|
|
typedef Data::key_type Key;
|
|
|
|
|
typedef Data::mapped_type Mapped;
|
|
|
|
|
typedef Data::iterator iterator;
|
|
|
|
|
typedef Data::const_iterator const_iterator;
|
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
static constexpr bool never_overflows = false;
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
|
|
const ColumnString::Offsets_t * offsets;
|
|
|
|
|
const ColumnString::Chars_t * chars;
|
|
|
|
|
|
|
|
|
|
void init(ConstColumnPlainPtrs & key_columns)
|
|
|
|
|
{
|
|
|
|
|
const IColumn & column = *key_columns[0];
|
|
|
|
|
const ColumnString & column_string = static_cast<const ColumnString &>(column);
|
|
|
|
|
offsets = &column_string.getOffsets();
|
|
|
|
|
chars = &column_string.getChars();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Key getKey(
|
|
|
|
|
const ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
size_t keys_size,
|
|
|
|
|
size_t i,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys) const
|
|
|
|
|
{
|
|
|
|
|
return StringRef(&(*chars)[i == 0 ? 0 : (*offsets)[i - 1]], (i == 0 ? (*offsets)[i] : ((*offsets)[i] - (*offsets)[i - 1])) - 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
|
2014-08-24 07:17:50 +00:00
|
|
|
|
static void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
2014-05-10 01:37:12 +00:00
|
|
|
|
it->first.data = pool.insert(it->first.data, it->first.size);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
|
|
|
|
{
|
|
|
|
|
key_columns[0]->insertData(it->first.data, it->first.size);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Для случая, когда есть один строковый ключ фиксированной длины.
|
|
|
|
|
struct AggregationMethodFixedString
|
|
|
|
|
{
|
|
|
|
|
typedef AggregatedDataWithStringKey Data;
|
|
|
|
|
typedef Data::key_type Key;
|
|
|
|
|
typedef Data::mapped_type Mapped;
|
|
|
|
|
typedef Data::iterator iterator;
|
|
|
|
|
typedef Data::const_iterator const_iterator;
|
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
static constexpr bool never_overflows = false;
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
|
|
size_t n;
|
|
|
|
|
const ColumnFixedString::Chars_t * chars;
|
|
|
|
|
|
|
|
|
|
void init(ConstColumnPlainPtrs & key_columns)
|
|
|
|
|
{
|
|
|
|
|
const IColumn & column = *key_columns[0];
|
|
|
|
|
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
|
|
|
|
|
n = column_string.getN();
|
|
|
|
|
chars = &column_string.getChars();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Key getKey(
|
|
|
|
|
const ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
size_t keys_size,
|
|
|
|
|
size_t i,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys) const
|
|
|
|
|
{
|
|
|
|
|
return StringRef(&(*chars)[i * n], n);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
|
2014-08-24 07:17:50 +00:00
|
|
|
|
static void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
2014-05-10 01:37:12 +00:00
|
|
|
|
it->first.data = pool.insert(it->first.data, it->first.size);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
|
|
|
|
{
|
|
|
|
|
key_columns[0]->insertData(it->first.data, it->first.size);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Для случая, когда все ключи фиксированной длины, и они помещаются в 128 бит.
|
|
|
|
|
struct AggregationMethodKeys128
|
|
|
|
|
{
|
|
|
|
|
typedef AggregatedDataWithKeys128 Data;
|
|
|
|
|
typedef Data::key_type Key;
|
|
|
|
|
typedef Data::mapped_type Mapped;
|
|
|
|
|
typedef Data::iterator iterator;
|
|
|
|
|
typedef Data::const_iterator const_iterator;
|
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
static constexpr bool never_overflows = false;
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
|
|
void init(ConstColumnPlainPtrs & key_columns)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Key getKey(
|
|
|
|
|
const ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
size_t keys_size,
|
|
|
|
|
size_t i,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys) const
|
|
|
|
|
{
|
|
|
|
|
return pack128(i, keys_size, key_columns, key_sizes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
|
2014-08-24 07:17:50 +00:00
|
|
|
|
static void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
|
|
|
|
{
|
|
|
|
|
size_t offset = 0;
|
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
|
|
|
{
|
|
|
|
|
size_t size = key_sizes[i];
|
|
|
|
|
key_columns[i]->insertData(reinterpret_cast<const char *>(&it->first) + offset, size);
|
|
|
|
|
offset += size;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Для остальных случаев. Агрегирует по 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.)
|
|
|
|
|
struct AggregationMethodHashed
|
|
|
|
|
{
|
|
|
|
|
typedef AggregatedDataHashed Data;
|
|
|
|
|
typedef Data::key_type Key;
|
|
|
|
|
typedef Data::mapped_type Mapped;
|
|
|
|
|
typedef Data::iterator iterator;
|
|
|
|
|
typedef Data::const_iterator const_iterator;
|
|
|
|
|
|
2014-10-29 02:35:16 +00:00
|
|
|
|
static constexpr bool never_overflows = false;
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
|
|
void init(ConstColumnPlainPtrs & key_columns)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Key getKey(
|
|
|
|
|
const ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
size_t keys_size,
|
|
|
|
|
size_t i,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys) const
|
|
|
|
|
{
|
|
|
|
|
return hash128(i, keys_size, key_columns, keys);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value.second; }
|
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value.second; }
|
|
|
|
|
|
2014-08-24 07:17:50 +00:00
|
|
|
|
static void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
|
2014-05-10 00:31:22 +00:00
|
|
|
|
{
|
2014-05-10 01:37:12 +00:00
|
|
|
|
it->second.first = placeKeysInPool(i, keys_size, keys, pool);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
|
|
|
key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Aggregator;
|
2011-09-26 07:25:22 +00:00
|
|
|
|
|
2013-12-16 02:32:00 +00:00
|
|
|
|
struct AggregatedDataVariants : private boost::noncopyable
|
2011-09-26 07:25:22 +00:00
|
|
|
|
{
|
2013-02-16 18:59:05 +00:00
|
|
|
|
/** Работа с состояниями агрегатных функций в пуле устроена следующим (неудобным) образом:
|
|
|
|
|
* - при агрегации, состояния создаются в пуле с помощью функции IAggregateFunction::create (внутри - placement new произвольной структуры);
|
|
|
|
|
* - они должны быть затем уничтожены с помощью IAggregateFunction::destroy (внутри - вызов деструктора произвольной структуры);
|
|
|
|
|
* - если агрегация завершена, то, в функции Aggregator::convertToBlock, указатели на состояния агрегатных функций
|
|
|
|
|
* записываются в ColumnAggregateFunction; ColumnAggregateFunction "захватывает владение" ими, то есть - вызывает destroy в своём деструкторе.
|
|
|
|
|
* - если при агрегации, до вызова Aggregator::convertToBlock вылетело исключение, то состояния агрегатных функций всё-равно должны быть уничтожены,
|
|
|
|
|
* иначе для сложных состояний (наприемер, AggregateFunctionUniq), будут утечки памяти;
|
|
|
|
|
* - чтобы, в этом случае, уничтожить состояния, в деструкторе вызывается метод Aggregator::destroyAggregateStates,
|
2014-04-08 07:58:53 +00:00
|
|
|
|
* но только если переменная aggregator (см. ниже) не nullptr;
|
2013-02-16 18:59:05 +00:00
|
|
|
|
* - то есть, пока вы не передали владение состояниями агрегатных функций в ColumnAggregateFunction, установите переменную aggregator,
|
|
|
|
|
* чтобы при возникновении исключения, состояния были корректно уничтожены.
|
|
|
|
|
*
|
|
|
|
|
* PS. Это можно исправить, сделав пул, который знает о том, какие состояния агрегатных функций и в каком порядке в него уложены, и умеет сам их уничтожать.
|
|
|
|
|
* Но это вряд ли можно просто сделать, так как в этот же пул планируется класть строки переменной длины.
|
|
|
|
|
* В этом случае, пул не сможет знать, по каким смещениям хранятся объекты.
|
|
|
|
|
*/
|
2014-04-08 07:31:51 +00:00
|
|
|
|
Aggregator * aggregator = nullptr;
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
size_t keys_size; /// Количество ключей NOTE нужно ли это поле?
|
|
|
|
|
Sizes key_sizes; /// Размеры ключей, если ключи фиксированной длины
|
2014-08-24 07:17:50 +00:00
|
|
|
|
|
2013-02-09 00:12:04 +00:00
|
|
|
|
/// Пулы для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction.
|
|
|
|
|
Arenas aggregates_pools;
|
2013-02-16 18:59:05 +00:00
|
|
|
|
Arena * aggregates_pool; /// Пул, который сейчас используется для аллокации.
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
/** Специализация для случая, когда ключи отсутствуют, и для ключей, не попавших в max_rows_to_group_by.
|
2013-05-04 15:46:50 +00:00
|
|
|
|
*/
|
2014-04-08 07:31:51 +00:00
|
|
|
|
AggregatedDataWithoutKey without_key = nullptr;
|
2011-09-26 15:45:31 +00:00
|
|
|
|
|
2014-10-29 01:18:50 +00:00
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt8>> key8;
|
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt16>> key16;
|
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt32>> key32;
|
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt64>> key64;
|
2014-05-10 00:31:22 +00:00
|
|
|
|
std::unique_ptr<AggregationMethodString> key_string;
|
|
|
|
|
std::unique_ptr<AggregationMethodFixedString> key_fixed_string;
|
|
|
|
|
std::unique_ptr<AggregationMethodKeys128> keys128;
|
|
|
|
|
std::unique_ptr<AggregationMethodHashed> hashed;
|
2014-08-24 07:17:50 +00:00
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
enum Type
|
|
|
|
|
{
|
2014-10-29 01:18:50 +00:00
|
|
|
|
EMPTY = 0,
|
|
|
|
|
WITHOUT_KEY,
|
|
|
|
|
KEY_8,
|
|
|
|
|
KEY_16,
|
|
|
|
|
KEY_32,
|
|
|
|
|
KEY_64,
|
|
|
|
|
KEY_STRING,
|
|
|
|
|
KEY_FIXED_STRING,
|
|
|
|
|
KEYS_128,
|
|
|
|
|
HASHED,
|
2012-02-27 06:28:20 +00:00
|
|
|
|
};
|
2014-04-08 07:31:51 +00:00
|
|
|
|
Type type = EMPTY;
|
2012-05-10 07:47:13 +00:00
|
|
|
|
|
2014-04-08 07:31:51 +00:00
|
|
|
|
AggregatedDataVariants() : aggregates_pools(1, new Arena), aggregates_pool(&*aggregates_pools.back()) {}
|
2012-12-25 19:28:59 +00:00
|
|
|
|
bool empty() const { return type == EMPTY; }
|
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
~AggregatedDataVariants();
|
2013-02-09 02:20:26 +00:00
|
|
|
|
|
2013-12-16 02:32:00 +00:00
|
|
|
|
void init(Type type_)
|
|
|
|
|
{
|
|
|
|
|
type = type_;
|
|
|
|
|
|
|
|
|
|
switch (type)
|
|
|
|
|
{
|
2014-05-10 00:31:22 +00:00
|
|
|
|
case EMPTY: break;
|
|
|
|
|
case WITHOUT_KEY: break;
|
2014-10-29 01:18:50 +00:00
|
|
|
|
case KEY_8: key8 .reset(new decltype(key8)::element_type); break;
|
|
|
|
|
case KEY_16: key16 .reset(new decltype(key16)::element_type); break;
|
|
|
|
|
case KEY_32: key32 .reset(new decltype(key32)::element_type); break;
|
|
|
|
|
case KEY_64: key64 .reset(new decltype(key64)::element_type); break;
|
|
|
|
|
case KEY_STRING: key_string .reset(new decltype(key_string)::element_type); break;
|
|
|
|
|
case KEY_FIXED_STRING: key_fixed_string.reset(new decltype(key_fixed_string)::element_type); break;
|
|
|
|
|
case KEYS_128: keys128 .reset(new decltype(keys128)::element_type); break;
|
|
|
|
|
case HASHED: hashed .reset(new decltype(hashed)::element_type); break;
|
2013-12-16 02:32:00 +00:00
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-12-25 19:28:59 +00:00
|
|
|
|
size_t size() const
|
|
|
|
|
{
|
|
|
|
|
switch (type)
|
|
|
|
|
{
|
2014-05-10 00:31:22 +00:00
|
|
|
|
case EMPTY: return 0;
|
|
|
|
|
case WITHOUT_KEY: return 1;
|
2014-10-29 01:18:50 +00:00
|
|
|
|
case KEY_8: return key8->data.size() + (without_key != nullptr);
|
|
|
|
|
case KEY_16: return key16->data.size() + (without_key != nullptr);
|
|
|
|
|
case KEY_32: return key32->data.size() + (without_key != nullptr);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
case KEY_64: return key64->data.size() + (without_key != nullptr);
|
|
|
|
|
case KEY_STRING: return key_string->data.size() + (without_key != nullptr);
|
|
|
|
|
case KEY_FIXED_STRING: return key_fixed_string->data.size() + (without_key != nullptr);
|
|
|
|
|
case KEYS_128: return keys128->data.size() + (without_key != nullptr);
|
|
|
|
|
case HASHED: return hashed->data.size() + (without_key != nullptr);
|
2012-12-25 19:28:59 +00:00
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-02-09 01:02:52 +00:00
|
|
|
|
|
|
|
|
|
const char * getMethodName() const
|
|
|
|
|
{
|
|
|
|
|
switch (type)
|
|
|
|
|
{
|
2014-05-10 00:31:22 +00:00
|
|
|
|
case EMPTY: return "EMPTY";
|
|
|
|
|
case WITHOUT_KEY: return "WITHOUT_KEY";
|
2014-10-29 01:18:50 +00:00
|
|
|
|
case KEY_8: return "KEY_8";
|
|
|
|
|
case KEY_16: return "KEY_16";
|
|
|
|
|
case KEY_32: return "KEY_32";
|
2014-05-10 00:31:22 +00:00
|
|
|
|
case KEY_64: return "KEY_64";
|
|
|
|
|
case KEY_STRING: return "KEY_STRING";
|
|
|
|
|
case KEY_FIXED_STRING: return "KEY_FIXED_STRING";
|
|
|
|
|
case KEYS_128: return "KEYS_128";
|
|
|
|
|
case HASHED: return "HASHED";
|
2013-02-09 01:02:52 +00:00
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-09-26 07:25:22 +00:00
|
|
|
|
};
|
|
|
|
|
|
2012-02-27 06:28:20 +00:00
|
|
|
|
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
|
|
|
|
|
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/** Агрегирует источник блоков.
|
2011-09-19 01:42:16 +00:00
|
|
|
|
*/
|
2011-09-19 03:34:23 +00:00
|
|
|
|
class Aggregator
|
2011-09-19 01:42:16 +00:00
|
|
|
|
{
|
|
|
|
|
public:
|
2014-02-26 11:44:54 +00:00
|
|
|
|
Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2013-06-20 14:24:43 +00:00
|
|
|
|
: keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()),
|
2014-05-10 00:31:22 +00:00
|
|
|
|
overflow_row(overflow_row_),
|
2012-12-25 19:28:59 +00:00
|
|
|
|
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
|
|
|
|
log(&Logger::get("Aggregator"))
|
2012-09-12 18:49:21 +00:00
|
|
|
|
{
|
2013-06-20 14:24:43 +00:00
|
|
|
|
std::sort(keys.begin(), keys.end());
|
|
|
|
|
keys.erase(std::unique(keys.begin(), keys.end()), keys.end());
|
|
|
|
|
keys_size = keys.size();
|
2012-09-12 18:49:21 +00:00
|
|
|
|
}
|
2012-05-31 00:33:42 +00:00
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2013-06-20 14:24:43 +00:00
|
|
|
|
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
|
2014-05-10 00:31:22 +00:00
|
|
|
|
overflow_row(overflow_row_),
|
2012-12-25 19:28:59 +00:00
|
|
|
|
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
|
|
|
|
log(&Logger::get("Aggregator"))
|
2012-09-12 18:49:21 +00:00
|
|
|
|
{
|
2013-06-20 14:24:43 +00:00
|
|
|
|
std::sort(key_names.begin(), key_names.end());
|
|
|
|
|
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
|
|
|
|
|
keys_size = key_names.size();
|
2012-09-12 18:49:21 +00:00
|
|
|
|
}
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/// Агрегировать источник. Получить результат в виде одной из структур данных.
|
2011-09-26 07:25:22 +00:00
|
|
|
|
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
|
2011-09-19 01:42:16 +00:00
|
|
|
|
|
2014-05-10 05:16:23 +00:00
|
|
|
|
typedef std::vector<ConstColumnPlainPtrs> AggregateColumns;
|
|
|
|
|
typedef std::vector<ColumnAggregateFunction::Container_t *> AggregateColumnsData;
|
|
|
|
|
|
|
|
|
|
/// Обработать один блок. Вернуть false, если обработку следует прервать (при group_by_overflow_mode = 'break').
|
|
|
|
|
bool executeOnBlock(Block & block, AggregatedDataVariants & result,
|
|
|
|
|
ConstColumnPlainPtrs & key_columns, AggregateColumns & aggregate_columns, /// Передаются, чтобы не создавать их заново на каждый блок
|
|
|
|
|
Sizes & key_sizes, StringRefs & keys, /// - передайте соответствующие объекты, которые изначально пустые.
|
|
|
|
|
bool & no_more_keys);
|
|
|
|
|
|
2013-09-01 04:55:41 +00:00
|
|
|
|
/** Преобразовать структуру данных агрегации в блок.
|
2014-02-26 11:44:54 +00:00
|
|
|
|
* Если overflow_row = true, то агрегаты для строк, не попавших в max_rows_to_group_by, кладутся в первую строчку возвращаемого блока.
|
2013-11-03 23:35:18 +00:00
|
|
|
|
*
|
|
|
|
|
* Если final = false, то в качестве столбцов-агрегатов создаются ColumnAggregateFunction с состоянием вычислений,
|
|
|
|
|
* которые могут быть затем объединены с другими состояниями (для распределённой обработки запроса).
|
|
|
|
|
* Если final = true, то в качестве столбцов-агрегатов создаются столбцы с готовыми значениями.
|
2013-09-01 04:55:41 +00:00
|
|
|
|
*/
|
2014-02-26 11:44:54 +00:00
|
|
|
|
Block convertToBlock(AggregatedDataVariants & data_variants, bool final);
|
2012-02-27 06:28:20 +00:00
|
|
|
|
|
2014-02-26 11:44:54 +00:00
|
|
|
|
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.) Все варианты агрегации должны быть одинаковыми!
|
2013-02-09 02:20:26 +00:00
|
|
|
|
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить, пока не будет вызвана функция convertToBlock.
|
|
|
|
|
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
|
|
|
|
|
*/
|
2012-02-27 06:28:20 +00:00
|
|
|
|
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants);
|
|
|
|
|
|
2012-05-30 01:38:02 +00:00
|
|
|
|
/** Объединить несколько агрегированных блоков в одну структуру данных.
|
2013-09-01 04:55:41 +00:00
|
|
|
|
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
|
2014-02-26 11:44:54 +00:00
|
|
|
|
* Если overflow_row = true, то предполагается, что агрегаты для строк, не попавших в max_rows_to_group_by, расположены в первой строке каждого блока.
|
2012-05-30 01:38:02 +00:00
|
|
|
|
*/
|
|
|
|
|
void merge(BlockInputStreamPtr stream, AggregatedDataVariants & result);
|
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
/// Для IBlockInputStream.
|
|
|
|
|
String getID() const;
|
|
|
|
|
|
2013-09-15 07:17:26 +00:00
|
|
|
|
protected:
|
2013-02-16 18:59:05 +00:00
|
|
|
|
friend struct AggregatedDataVariants;
|
2014-08-24 07:17:50 +00:00
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
ColumnNumbers keys;
|
2011-09-24 20:32:41 +00:00
|
|
|
|
Names key_names;
|
2011-09-19 01:42:16 +00:00
|
|
|
|
AggregateDescriptions aggregates;
|
2013-02-08 20:34:30 +00:00
|
|
|
|
std::vector<IAggregateFunction *> aggregate_functions;
|
2012-09-12 18:49:21 +00:00
|
|
|
|
size_t keys_size;
|
2013-02-03 18:08:52 +00:00
|
|
|
|
size_t aggregates_size;
|
2014-02-26 11:44:54 +00:00
|
|
|
|
/// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
|
|
|
|
|
bool overflow_row;
|
2011-09-19 03:34:23 +00:00
|
|
|
|
|
2013-02-08 20:34:30 +00:00
|
|
|
|
Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций.
|
2014-05-10 00:31:22 +00:00
|
|
|
|
size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций.
|
|
|
|
|
bool all_aggregates_has_trivial_destructor = false;
|
2013-02-08 20:34:30 +00:00
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/// Для инициализации от первого блока при конкуррентном использовании.
|
2014-05-10 00:31:22 +00:00
|
|
|
|
bool initialized = false;
|
2012-03-05 07:58:34 +00:00
|
|
|
|
Poco::FastMutex mutex;
|
|
|
|
|
|
2012-12-25 19:28:59 +00:00
|
|
|
|
size_t max_rows_to_group_by;
|
2014-02-17 23:56:45 +00:00
|
|
|
|
OverflowMode group_by_overflow_mode;
|
2012-12-25 19:28:59 +00:00
|
|
|
|
|
2011-09-19 03:34:23 +00:00
|
|
|
|
Block sample;
|
2012-03-05 07:58:34 +00:00
|
|
|
|
|
2012-05-31 00:33:42 +00:00
|
|
|
|
Logger * log;
|
|
|
|
|
|
2012-03-05 07:58:34 +00:00
|
|
|
|
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
|
|
|
|
|
* Сформировать блок - пример результата.
|
|
|
|
|
*/
|
|
|
|
|
void initialize(Block & block);
|
2012-05-30 01:38:02 +00:00
|
|
|
|
|
|
|
|
|
/** Выбрать способ агрегации на основе количества и типов ключей. */
|
2013-06-30 16:56:00 +00:00
|
|
|
|
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
|
2013-02-16 18:59:05 +00:00
|
|
|
|
|
2014-05-19 19:41:56 +00:00
|
|
|
|
/** Создать состояния агрегатных функций для одного ключа.
|
|
|
|
|
*/
|
|
|
|
|
void createAggregateStates(AggregateDataPtr & aggregate_data) const;
|
|
|
|
|
|
2013-02-16 18:59:05 +00:00
|
|
|
|
/** Вызвать методы destroy для состояний агрегатных функций.
|
|
|
|
|
* Используется в обработчике исключений при агрегации, так как RAII в данном случае не применим.
|
|
|
|
|
*/
|
2014-05-19 19:41:56 +00:00
|
|
|
|
void destroyAllAggregateStates(AggregatedDataVariants & result);
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
|
void executeImpl(
|
|
|
|
|
Method & method,
|
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
|
size_t rows,
|
|
|
|
|
ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
AggregateColumns & aggregate_columns,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys,
|
|
|
|
|
bool no_more_keys,
|
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
|
void convertToBlockImpl(
|
|
|
|
|
Method & method,
|
|
|
|
|
ColumnPlainPtrs & key_columns,
|
|
|
|
|
AggregateColumnsData & aggregate_columns,
|
|
|
|
|
ColumnPlainPtrs & final_aggregate_columns,
|
|
|
|
|
const Sizes & key_sizes,
|
2014-05-28 14:54:42 +00:00
|
|
|
|
size_t start_row, bool final) const;
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
|
void mergeDataImpl(
|
|
|
|
|
Method & method_dst,
|
|
|
|
|
Method & method_src) const;
|
|
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
|
void mergeStreamsImpl(
|
|
|
|
|
Method & method,
|
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
|
size_t start_row,
|
|
|
|
|
size_t rows,
|
|
|
|
|
ConstColumnPlainPtrs & key_columns,
|
|
|
|
|
AggregateColumnsData & aggregate_columns,
|
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
|
StringRefs & keys) const;
|
|
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
|
void destroyImpl(
|
|
|
|
|
Method & method) const;
|
2011-09-19 01:42:16 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|