dbms: Aggregator: decomposed code to easier trying more aggregation methods; performance is suddenly improved up to 25%; fixed memory leak when exception while aggregating by KEYS_128 method [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-05-10 04:31:22 +04:00
parent ea3a22102d
commit 47afe8bd9c
3 changed files with 647 additions and 705 deletions

View File

@ -19,6 +19,11 @@
#include <DB/Interpreters/AggregationCommon.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnAggregateFunction.h>
namespace DB
{
@ -47,8 +52,242 @@ typedef HashMap<StringRef, AggregateDataPtr> AggregatedDataWithStringKey;
typedef HashMap<UInt128, AggregateDataPtr, UInt128Hash> AggregatedDataWithKeys128;
typedef HashMap<UInt128, std::pair<StringRef*, AggregateDataPtr>, UInt128TrivialHash> AggregatedDataHashed;
class Aggregator;
/// Для случая, когда есть один числовой ключ.
struct AggregationMethodKey64
{
typedef AggregatedDataWithUInt64Key Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
Data data;
const IColumn * column;
/** Вызывается в начале обработки каждого блока.
* Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах.
*/
void init(ConstColumnPlainPtrs & key_columns)
{
column = key_columns[0];
}
/// Достать из ключевых столбцов ключ для вставки в хэш-таблицу.
Key getKey(
const ConstColumnPlainPtrs & key_columns, /// Ключевые столбцы.
size_t keys_size, /// Количество ключевых столбцов.
size_t i, /// Из какой строки блока достать ключ.
const Sizes & key_sizes, /// Если ключи фиксированной длины - их длины. Не используется в методах агрегации по ключам переменной длины.
StringRefs & keys) const /// Сюда могут быть записаны ссылки на данные ключей в столбцах. Они могут быть использованы в дальнейшем.
{
return column->get64(i);
}
/// Из значения в хэш-таблице получить AggregateDataPtr.
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
/** Разместить дополнительные данные, если это необходимо, в случае, когда в хэш-таблицу был вставлен новый ключ.
*/
void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys)
{
}
/** Вставить ключ из хэш-таблицы в столбцы.
*/
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
key_columns[0]->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
}
};
/// Для случая, когда есть один строковый ключ.
struct AggregationMethodString
{
typedef AggregatedDataWithStringKey Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
Data data;
Arena string_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool?
const ColumnString::Offsets_t * offsets;
const ColumnString::Chars_t * chars;
void init(ConstColumnPlainPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = static_cast<const ColumnString &>(column);
offsets = &column_string.getOffsets();
chars = &column_string.getChars();
}
Key getKey(
const ConstColumnPlainPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs & keys) const
{
return StringRef(&(*chars)[i == 0 ? 0 : (*offsets)[i - 1]], (i == 0 ? (*offsets)[i] : ((*offsets)[i] - (*offsets)[i - 1])) - 1);
}
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys)
{
it->first.data = string_pool.insert(it->first.data, it->first.size);
}
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
key_columns[0]->insertData(it->first.data, it->first.size);
}
};
/// Для случая, когда есть один строковый ключ фиксированной длины.
struct AggregationMethodFixedString
{
typedef AggregatedDataWithStringKey Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
Data data;
Arena string_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool?
size_t n;
const ColumnFixedString::Chars_t * chars;
void init(ConstColumnPlainPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
n = column_string.getN();
chars = &column_string.getChars();
}
Key getKey(
const ConstColumnPlainPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs & keys) const
{
return StringRef(&(*chars)[i * n], n);
}
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys)
{
it->first.data = string_pool.insert(it->first.data, it->first.size);
}
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
key_columns[0]->insertData(it->first.data, it->first.size);
}
};
/// Для случая, когда все ключи фиксированной длины, и они помещаются в 128 бит.
struct AggregationMethodKeys128
{
typedef AggregatedDataWithKeys128 Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
Data data;
void init(ConstColumnPlainPtrs & key_columns)
{
}
Key getKey(
const ConstColumnPlainPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs & keys) const
{
return pack128(i, keys_size, key_columns, key_sizes);
}
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys)
{
}
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
size_t offset = 0;
for (size_t i = 0; i < keys_size; ++i)
{
size_t size = key_sizes[i];
key_columns[i]->insertData(reinterpret_cast<const char *>(&it->first) + offset, size);
offset += size;
}
}
};
/// Для остальных случаев. Агрегирует по 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.)
struct AggregationMethodHashed
{
typedef AggregatedDataHashed Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
Data data;
Arena keys_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool?
void init(ConstColumnPlainPtrs & key_columns)
{
}
Key getKey(
const ConstColumnPlainPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs & keys) const
{
return hash128(i, keys_size, key_columns, keys);
}
static AggregateDataPtr & getAggregateData(Mapped & value) { return value.second; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value.second; }
void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys)
{
it->second.first = placeKeysInPool(i, keys_size, keys, keys_pool);
}
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size);
}
};
class Aggregator;
struct AggregatedDataVariants : private boost::noncopyable
{
@ -69,6 +308,9 @@ struct AggregatedDataVariants : private boost::noncopyable
* В этом случае, пул не сможет знать, по каким смещениям хранятся объекты.
*/
Aggregator * aggregator = nullptr;
size_t keys_size; /// Количество ключей NOTE нужно ли это поле?
Sizes key_sizes; /// Размеры ключей, если ключи фиксированной длины
/// Пулы для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction.
Arenas aggregates_pools;
@ -78,34 +320,21 @@ struct AggregatedDataVariants : private boost::noncopyable
*/
AggregatedDataWithoutKey without_key = nullptr;
/// Специализация для случая, когда есть один числовой ключ.
/// unique_ptr - для ленивой инициализации (так как иначе HashMap в конструкторе выделяет и зануляет слишком много памяти).
std::unique_ptr<AggregatedDataWithUInt64Key> key64;
/// Специализация для случая, когда есть один строковый ключ.
std::unique_ptr<AggregatedDataWithStringKey> key_string;
Arena string_pool;
size_t keys_size; /// Количество ключей
Sizes key_sizes; /// Размеры ключей, если ключи фиксированной длины
/// Специализация для случая, когда ключи фискированной длины помещаются в 128 бит.
std::unique_ptr<AggregatedDataWithKeys128> keys128;
/** Агрегирует по 128 битному хэшу от ключа.
* (При этом, строки, содержащие нули посередине, могут склеиться.)
*/
std::unique_ptr<AggregatedDataHashed> hashed;
Arena keys_pool;
std::unique_ptr<AggregationMethodKey64> key64;
std::unique_ptr<AggregationMethodString> key_string;
std::unique_ptr<AggregationMethodFixedString> key_fixed_string;
std::unique_ptr<AggregationMethodKeys128> keys128;
std::unique_ptr<AggregationMethodHashed> hashed;
enum Type
{
EMPTY = 0,
WITHOUT_KEY = 1,
KEY_64 = 2,
KEY_STRING = 3,
KEYS_128 = 4,
HASHED = 5,
EMPTY = 0,
WITHOUT_KEY = 1,
KEY_64 = 2,
KEY_STRING = 3,
KEY_FIXED_STRING = 4,
KEYS_128 = 5,
HASHED = 6,
};
Type type = EMPTY;
@ -120,12 +349,13 @@ struct AggregatedDataVariants : private boost::noncopyable
switch (type)
{
case EMPTY: break;
case WITHOUT_KEY: break;
case KEY_64: key64 .reset(new AggregatedDataWithUInt64Key); break;
case KEY_STRING: key_string .reset(new AggregatedDataWithStringKey); break;
case KEYS_128: keys128 .reset(new AggregatedDataWithKeys128); break;
case HASHED: hashed .reset(new AggregatedDataHashed); break;
case EMPTY: break;
case WITHOUT_KEY: break;
case KEY_64: key64 .reset(new AggregationMethodKey64); break;
case KEY_STRING: key_string .reset(new AggregationMethodString); break;
case KEY_FIXED_STRING: key_fixed_string.reset(new AggregationMethodFixedString); break;
case KEYS_128: keys128 .reset(new AggregationMethodKeys128); break;
case HASHED: hashed .reset(new AggregationMethodHashed); break;
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -136,12 +366,13 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case EMPTY: return 0;
case WITHOUT_KEY: return 1;
case KEY_64: return key64->size() + (without_key != nullptr);
case KEY_STRING: return key_string->size() + (without_key != nullptr);
case KEYS_128: return keys128->size() + (without_key != nullptr);
case HASHED: return hashed->size() + (without_key != nullptr);
case EMPTY: return 0;
case WITHOUT_KEY: return 1;
case KEY_64: return key64->data.size() + (without_key != nullptr);
case KEY_STRING: return key_string->data.size() + (without_key != nullptr);
case KEY_FIXED_STRING: return key_fixed_string->data.size() + (without_key != nullptr);
case KEYS_128: return keys128->data.size() + (without_key != nullptr);
case HASHED: return hashed->data.size() + (without_key != nullptr);
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -152,12 +383,13 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case EMPTY: return "EMPTY";
case WITHOUT_KEY: return "WITHOUT_KEY";
case KEY_64: return "KEY_64";
case KEY_STRING: return "KEY_STRING";
case KEYS_128: return "KEYS_128";
case HASHED: return "HASHED";
case EMPTY: return "EMPTY";
case WITHOUT_KEY: return "WITHOUT_KEY";
case KEY_64: return "KEY_64";
case KEY_STRING: return "KEY_STRING";
case KEY_FIXED_STRING: return "KEY_FIXED_STRING";
case KEYS_128: return "KEYS_128";
case HASHED: return "HASHED";
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -177,7 +409,7 @@ public:
Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), total_size_of_aggregate_states(0), all_aggregates_has_trivial_destructor(false), initialized(false),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
log(&Logger::get("Aggregator"))
{
@ -189,7 +421,7 @@ public:
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), total_size_of_aggregate_states(0), all_aggregates_has_trivial_destructor(false), initialized(false),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
log(&Logger::get("Aggregator"))
{
@ -238,11 +470,11 @@ protected:
bool overflow_row;
Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций.
size_t total_size_of_aggregate_states; /// Суммарный размер строки из агрегатных функций.
bool all_aggregates_has_trivial_destructor;
size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций.
bool all_aggregates_has_trivial_destructor = false;
/// Для инициализации от первого блока при конкуррентном использовании.
bool initialized;
bool initialized = false;
Poco::FastMutex mutex;
size_t max_rows_to_group_by;
@ -264,6 +496,52 @@ protected:
* Используется в обработчике исключений при агрегации, так как RAII в данном случае не применим.
*/
void destroyAggregateStates(AggregatedDataVariants & result);
typedef std::vector<ConstColumnPlainPtrs> AggregateColumns;
typedef std::vector<ColumnAggregateFunction::Container_t *> AggregateColumnsData;
template <typename Method>
void executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumns & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const;
template <typename Method>
void convertToBlockImpl(
Method & method,
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
size_t start_row,
bool final) const;
template <typename Method>
void mergeDataImpl(
Method & method_dst,
Method & method_src) const;
template <typename Method>
void mergeStreamsImpl(
Method & method,
Arena * aggregates_pool,
size_t start_row,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys) const;
template <typename Method>
void destroyImpl(
Method & method) const;
};

View File

@ -3,9 +3,6 @@
#include <statdaemons/Stopwatch.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
@ -19,7 +16,16 @@ namespace DB
AggregatedDataVariants::~AggregatedDataVariants()
{
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
aggregator->destroyAggregateStates(*this);
{
try
{
aggregator->destroyAggregateStates(*this);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
@ -85,13 +91,13 @@ void Aggregator::initialize(Block & block)
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type;
col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters);
col.column = new ColumnAggregateFunction(aggregates[i].function);
sample.insert(col);
}
}
}
}
@ -119,26 +125,205 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
/// Если есть один числовой ключ, который помещается в 64 бита
if (keys_size == 1 && key_columns[0]->isNumeric())
{
return AggregatedDataVariants::KEY_64;
}
/// Если ключи помещаются в 128 бит, будем использовать хэш-таблицу по упакованным в 128-бит ключам
if (keys_fit_128_bits)
return AggregatedDataVariants::KEYS_128;
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
if (keys_size == 1
&& (dynamic_cast<const ColumnString *>(key_columns[0]) || dynamic_cast<const ColumnFixedString *>(key_columns[0])
|| dynamic_cast<const ColumnConstString *>(key_columns[0])))
if (keys_size == 1 && dynamic_cast<const ColumnString *>(key_columns[0]))
return AggregatedDataVariants::KEY_STRING;
if (keys_size == 1 && dynamic_cast<const ColumnFixedString *>(key_columns[0]))
return AggregatedDataVariants::KEY_FIXED_STRING;
/// Иначе будем агрегировать по хэшу от ключей.
return AggregatedDataVariants::HASHED;
}
template <typename Method>
void Aggregator::executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumns & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
method.init(key_columns);
/// Для всех строчек.
for (size_t i = 0; i < rows; ++i)
{
typename Method::iterator it;
bool inserted; /// Вставили новый ключ, или такой ключ уже был?
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = method.getKey(key_columns, keys_size, i, key_sizes, keys);
if (!no_more_keys) /// Вставляем.
method.data.emplace(key, it, inserted);
else
{
/// Будем добавлять только если ключ уже есть.
inserted = false;
it = method.data.find(key);
if (method.data.end() == it)
overflow = true;
}
/// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего.
if (overflow && !overflow_row)
continue;
/// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключём.
if (inserted)
{
method.onNewKey(it, keys_size, i, keys);
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = !overflow ? Method::getAggregateData(it->second) : overflow_row;
/// Добавляем значения в агрегатные функции.
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
template <typename Method>
void Aggregator::convertToBlockImpl(
Method & method,
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
size_t start_row,
bool final) const
{
if (!final)
{
size_t j = start_row;
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
}
}
else
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}
}
template <typename Method>
void Aggregator::mergeDataImpl(
Method & method_dst,
Method & method_src) const
{
for (typename Method::iterator it = method_src.data.begin(); it != method_src.data.end(); ++it)
{
typename Method::iterator res_it;
bool inserted;
method_dst.data.emplace(it->first, res_it, inserted);
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
}
}
else
{
res_it->second = it->second;
}
}
}
template <typename Method>
void Aggregator::mergeStreamsImpl(
Method & method,
Arena * aggregates_pool,
size_t start_row,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
const Sizes & key_sizes,
StringRefs & keys) const
{
method.init(key_columns);
/// Для всех строчек.
for (size_t i = start_row; i < rows; ++i)
{
typename Method::iterator it;
bool inserted; /// Вставили новый ключ, или такой ключ уже был?
/// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = method.getKey(key_columns, keys_size, i, key_sizes, keys);
method.data.emplace(key, it, inserted);
if (inserted)
{
method.onNewKey(it, keys_size, i, keys);
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]);
}
/// Мерджим состояния агрегатных функций.
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[j],
(*aggregate_columns[j])[i]);
}
}
template <typename Method>
void Aggregator::destroyImpl(
Method & method) const
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
}
/** Результат хранится в оперативке и должен полностью помещаться в оперативку.
*/
void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result)
@ -146,7 +331,6 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
typedef std::vector<ConstColumnPlainPtrs> AggregateColumns;
AggregateColumns aggregate_columns(aggregates_size);
/** Используется, если есть ограничение на максимальное количество строк при агрегации,
@ -244,227 +428,24 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
}
}
}
AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr;
if (result.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = *result.key64;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
UInt64 key = column.get64(i);
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
bool overflow = false;
if (!no_more_keys)
res.emplace(key, it, inserted);
else
{
inserted = false;
it = res.find(key);
if (res.end() == it)
overflow = true;
}
if (overflow && !overflow_row)
continue;
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = overflow ? result.without_key : it->second;
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
executeImpl(*result.key64, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res = *result.key_string;
const IColumn & column = *key_columns[0];
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
{
const ColumnString::Offsets_t & offsets = column_string->getOffsets();
const ColumnString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
AggregatedDataWithStringKey::iterator it;
bool inserted;
bool overflow = false;
if (!no_more_keys)
res.emplace(ref, it, inserted);
else
{
inserted = false;
it = res.find(ref);
if (res.end() == it)
overflow = true;
}
if (overflow && !overflow_row)
continue;
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = overflow ? result.without_key : it->second;
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
{
size_t n = column_string->getN();
const ColumnFixedString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i * n], n);
AggregatedDataWithStringKey::iterator it;
bool inserted;
bool overflow = false;
if (!no_more_keys)
res.emplace(ref, it, inserted);
else
{
inserted = false;
it = res.find(ref);
if (res.end() == it)
overflow = true;
}
if (overflow && !overflow_row)
continue;
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = overflow ? result.without_key : it->second;
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
else
throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
executeImpl(*result.key_string, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING)
executeImpl(*result.key_fixed_string, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEYS_128)
{
AggregatedDataWithKeys128 & res = *result.keys128;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
AggregatedDataWithKeys128::iterator it;
bool inserted;
bool overflow = false;
UInt128 key128 = pack128(i, keys_size, key_columns, key_sizes);
if (!no_more_keys)
res.emplace(key128, it, inserted);
else
{
inserted = false;
it = res.find(key128);
if (res.end() == it)
overflow = true;
}
if (overflow && !overflow_row)
continue;
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = overflow ? result.without_key : it->second;
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
executeImpl(*result.keys128, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res = *result.hashed;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
AggregatedDataHashed::iterator it;
bool inserted;
bool overflow = false;
UInt128 key128 = hash128(i, keys_size, key_columns, key);
if (!no_more_keys)
res.emplace(key128, it, inserted);
else
{
inserted = false;
it = res.find(key128);
if (res.end() == it)
overflow = true;
}
if (overflow && !overflow_row)
continue;
if (inserted)
{
it->second.first = placeKeysInPool(i, keys_size, key, result.keys_pool);
it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]);
}
AggregateDataPtr value = overflow ? result.without_key : it->second.second;
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
executeImpl(*result.hashed, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -506,10 +487,8 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
if (data_variants.empty())
return Block();
typedef std::vector<ColumnAggregateFunction::Container_t *> AggregateColumns;
ColumnPlainPtrs key_columns(keys_size);
AggregateColumns aggregate_columns(aggregates_size);
AggregateColumnsData aggregate_columns(aggregates_size);
ColumnPlainPtrs final_aggregate_columns(aggregates_size);
for (size_t i = 0; i < keys_size; ++i)
@ -561,130 +540,20 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
size_t start_row = overflow_row ? 1 : 0;
if (data_variants.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & data = *data_variants.key64;
IColumn & first_column = *key_columns[0];
size_t j = start_row;
if (!final)
{
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
first_column.insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i];
}
}
else
{
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
first_column.insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
}
}
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & data = *data_variants.key_string;
IColumn & first_column = *key_columns[0];
size_t j = start_row;
if (!final)
{
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
first_column.insertData(it->first.data, it->first.size);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i];
}
}
else
{
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
first_column.insertData(it->first.data, it->first.size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
}
}
convertToBlockImpl(*data_variants.key_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_FIXED_STRING)
convertToBlockImpl(*data_variants.key_fixed_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEYS_128)
{
AggregatedDataWithKeys128 & data = *data_variants.keys128;
size_t j = start_row;
if (!final)
{
for (AggregatedDataWithKeys128::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
size_t offset = 0;
for (size_t i = 0; i < keys_size; ++i)
{
size_t size = data_variants.key_sizes[i];
key_columns[i]->insertData(reinterpret_cast<const char *>(&it->first) + offset, size);
offset += size;
}
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i];
}
}
else
{
for (AggregatedDataWithKeys128::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
size_t offset = 0;
for (size_t i = 0; i < keys_size; ++i)
{
size_t size = data_variants.key_sizes[i];
key_columns[i]->insertData(reinterpret_cast<const char *>(&it->first) + offset, size);
offset += size;
}
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
}
}
convertToBlockImpl(*data_variants.keys128, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & data = *data_variants.hashed;
size_t j = start_row;
if (!final)
{
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second.second + offsets_of_aggregate_states[i];
}
}
else
{
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(it->second.second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
}
}
convertToBlockImpl(*data_variants.hashed, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -755,103 +624,17 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);
}
}
if (res->type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res_data = *res->key64;
AggregatedDataWithUInt64Key & current_data = *current.key64;
for (AggregatedDataWithUInt64Key::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataWithUInt64Key::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
}
}
else
res_it->second = it->second;
}
}
mergeDataImpl(*res->key64, *current.key64);
else if (res->type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res_data = *res->key_string;
AggregatedDataWithStringKey & current_data = *current.key_string;
for (AggregatedDataWithStringKey::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataWithStringKey::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
}
}
else
res_it->second = it->second;
}
}
mergeDataImpl(*res->key_string, *current.key_string);
else if (res->type == AggregatedDataVariants::KEY_FIXED_STRING)
mergeDataImpl(*res->key_fixed_string, *current.key_fixed_string);
else if (res->type == AggregatedDataVariants::KEYS_128)
{
AggregatedDataWithKeys128 & res_data = *res->keys128;
AggregatedDataWithKeys128 & current_data = *current.keys128;
for (AggregatedDataWithKeys128::iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataWithKeys128::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
}
}
else
{
res_it->second = it->second;
}
}
}
mergeDataImpl(*res->keys128, *current.keys128);
else if (res->type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res_data = *res->hashed;
AggregatedDataHashed & current_data = *current.hashed;
for (AggregatedDataHashed::iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataHashed::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(res_it->second.second + offsets_of_aggregate_states[i], it->second.second + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(it->second.second + offsets_of_aggregate_states[i]);
}
}
else
{
res_it->second = it->second;
}
}
}
mergeDataImpl(*res->hashed, *current.hashed);
else if (res->type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -877,9 +660,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
typedef ColumnAggregateFunction::Container_t * AggregateColumn;
typedef std::vector<AggregateColumn> AggregateColumns;
AggregateColumns aggregate_columns(aggregates_size);
AggregateColumnsData aggregate_columns(aggregates_size);
Block empty_block;
initialize(empty_block);
@ -935,148 +716,15 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
size_t start_row = overflow_row ? 1 : 0;
if (result.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = *result.key64;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = start_row; i < rows; ++i)
{
/// Строим ключ
UInt64 key = column.get64(i);
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
res.emplace(key, it, inserted);
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]);
}
}
mergeStreamsImpl(*result.key64, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res = *result.key_string;
const IColumn & column = *key_columns[0];
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
{
const ColumnString::Offsets_t & offsets = column_string->getOffsets();
const ColumnString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = start_row; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
AggregatedDataWithStringKey::iterator it;
bool inserted;
res.emplace(ref, it, inserted);
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]);
}
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
{
size_t n = column_string->getN();
const ColumnFixedString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = start_row; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i * n], n);
AggregatedDataWithStringKey::iterator it;
bool inserted;
res.emplace(ref, it, inserted);
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]);
}
}
else
throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
mergeStreamsImpl(*result.key_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING)
mergeStreamsImpl(*result.key_fixed_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEYS_128)
{
AggregatedDataWithKeys128 & res = *result.keys128;
/// Для всех строчек
for (size_t i = start_row; i < rows; ++i)
{
AggregatedDataWithKeys128::iterator it;
bool inserted;
res.emplace(pack128(i, keys_size, key_columns, key_sizes), it, inserted);
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]);
}
}
mergeStreamsImpl(*result.keys128, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res = *result.hashed;
/// Для всех строчек
for (size_t i = start_row; i < rows; ++i)
{
AggregatedDataHashed::iterator it;
bool inserted;
res.emplace(hash128(i, keys_size, key_columns, key), it, inserted);
if (inserted)
{
it->second.first = placeKeysInPool(i, keys_size, key, result.keys_pool);
it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(it->second.second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]);
}
}
mergeStreamsImpl(*result.hashed, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -1100,30 +748,19 @@ void Aggregator::destroyAggregateStates(AggregatedDataVariants & result)
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
if (result.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res_data = *result.key64;
for (AggregatedDataWithUInt64Key::const_iterator it = res_data.begin(); it != res_data.end(); ++it)
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
}
destroyImpl(*result.key64);
else if (result.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res_data = *result.key_string;
for (AggregatedDataWithStringKey::const_iterator it = res_data.begin(); it != res_data.end(); ++it)
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
}
destroyImpl(*result.key_string);
else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING)
destroyImpl(*result.key_fixed_string);
else if (result.type == AggregatedDataVariants::KEYS_128)
destroyImpl(*result.keys128);
else if (result.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res_data = *result.hashed;
for (AggregatedDataHashed::iterator it = res_data.begin(); it != res_data.end(); ++it)
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(it->second.second + offsets_of_aggregate_states[i]);
}
destroyImpl(*result.hashed);
else if (result.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}

View File

@ -14,7 +14,7 @@ namespace DB
void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedDataVariants & results)
{
Stopwatch watch;
//Stopwatch watch;
/// Читаем все данные
while (Block block = stream->read())
@ -49,17 +49,14 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
/// Каким способом выполнять агрегацию?
if (method == AggregatedDataVariants::EMPTY)
{
method = chooseAggregationMethod(key_columns, key_sizes);
// LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
}
/// Подготавливаем массивы, куда будут складываться ключи или хэши от ключей.
if (method == AggregatedDataVariants::KEY_64)
{
keys64.resize(rows);
}
else if (method == AggregatedDataVariants::KEY_STRING)
else if (method == AggregatedDataVariants::KEY_STRING || method == AggregatedDataVariants::KEY_FIXED_STRING)
{
hashes64.resize(rows);
string_refs.resize(rows);
@ -93,9 +90,9 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
/// Параллельно вычисляем хэши и ключи.
LOG_TRACE(log, "Calculating keys and hashes.");
// LOG_TRACE(log, "Calculating keys and hashes.");
watch.start();
// watch.start();
for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::calculateHashesThread, this,
@ -109,12 +106,12 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
rethrowFirstException(exceptions);
LOG_TRACE(log, "Calculated keys and hashes in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
watch.restart();
// LOG_TRACE(log, "Calculated keys and hashes in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
// watch.restart();
/// Параллельно агрегируем в независимые хэш-таблицы
LOG_TRACE(log, "Parallel aggregating.");
// LOG_TRACE(log, "Parallel aggregating.");
for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::aggregateThread, this,
@ -128,20 +125,13 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
rethrowFirstException(exceptions);
LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
// LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
/// Проверка ограничений
if (max_rows_to_group_by && size_of_all_results > max_rows_to_group_by && group_by_overflow_mode == OverflowMode::BREAK)
break;
}
/* double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");*/
}
@ -188,33 +178,32 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
else if (method == AggregatedDataVariants::KEY_STRING)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = dynamic_cast<const ColumnString &>(column);
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
const ColumnString::Offsets_t & offsets = column_string.getOffsets();
const ColumnString::Chars_t & data = column_string.getChars();
for (size_t i = begin; i < end; ++i)
{
const ColumnString::Offsets_t & offsets = column_string->getOffsets();
const ColumnString::Chars_t & data = column_string->getChars();
for (size_t i = begin; i < end; ++i)
{
string_refs[i] = StringRef(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
hashes64[i] = hash_func_string(string_refs[i]);
thread_nums[i] = (hashes64[i] >> 32) % threads;
}
string_refs[i] = StringRef(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
hashes64[i] = hash_func_string(string_refs[i]);
thread_nums[i] = (hashes64[i] >> 32) % threads;
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
}
else if (method == AggregatedDataVariants::KEY_FIXED_STRING)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = dynamic_cast<const ColumnFixedString &>(column);
size_t n = column_string.getN();
const ColumnFixedString::Chars_t & data = column_string.getChars();
for (size_t i = begin; i < end; ++i)
{
size_t n = column_string->getN();
const ColumnFixedString::Chars_t & data = column_string->getChars();
for (size_t i = begin; i < end; ++i)
{
string_refs[i] = StringRef(&data[i * n], n);
hashes64[i] = hash_func_string(string_refs[i]);
thread_nums[i] = (hashes64[i] >> 32) % threads;
}
string_refs[i] = StringRef(&data[i * n], n);
hashes64[i] = hash_func_string(string_refs[i]);
thread_nums[i] = (hashes64[i] >> 32) % threads;
}
else
throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
else if (method == AggregatedDataVariants::KEYS_128)
{
@ -261,7 +250,7 @@ void SplittingAggregator::aggregateThread(
if (method == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = *result.key64;
AggregatedDataWithUInt64Key & res = result.key64->data;
for (size_t i = 0; i < rows; ++i)
{
@ -299,7 +288,7 @@ void SplittingAggregator::aggregateThread(
}
else if (method == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res = *result.key_string;
AggregatedDataWithStringKey & res = result.key_string->data;
for (size_t i = 0; i < rows; ++i)
{
@ -323,7 +312,45 @@ void SplittingAggregator::aggregateThread(
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
it->first.data = result.key_string->string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
else if (method == AggregatedDataVariants::KEY_FIXED_STRING)
{
AggregatedDataWithStringKey & res = result.key_fixed_string->data;
for (size_t i = 0; i < rows; ++i)
{
if (thread_nums[i] != thread_no)
continue;
AggregatedDataWithStringKey::iterator it;
bool inserted;
StringRef ref = string_refs[i];
if (!no_more_keys)
res.emplace(ref, it, inserted, hashes64[i]);
else
{
inserted = false;
it = res.find(ref);
if (res.end() == it)
continue;
}
if (inserted)
{
it->first.data = result.key_fixed_string->string_pool.insert(ref.data, ref.size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
@ -337,7 +364,7 @@ void SplittingAggregator::aggregateThread(
}
else if (method == AggregatedDataVariants::KEYS_128)
{
AggregatedDataWithKeys128 & res = *result.keys128;
AggregatedDataWithKeys128 & res = result.keys128->data;
for (size_t i = 0; i < rows; ++i)
{
@ -374,7 +401,7 @@ void SplittingAggregator::aggregateThread(
else if (method == AggregatedDataVariants::HASHED)
{
StringRefs key(keys_size);
AggregatedDataHashed & res = *result.hashed;
AggregatedDataHashed & res = result.hashed->data;
for (size_t i = 0; i < rows; ++i)
{
@ -397,7 +424,7 @@ void SplittingAggregator::aggregateThread(
if (inserted)
{
it->second.first = extractKeysAndPlaceInPool(i, keys_size, key_columns, key, result.keys_pool);
it->second.first = extractKeysAndPlaceInPool(i, keys_size, key_columns, key, result.hashed->keys_pool);
it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)