diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index 73c5d87cf7e..ed12ffb5477 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -11,25 +11,35 @@ namespace DB { /** Столбец, хранящий состояния агрегатных функций. - * Состояния агрегатных функций хранятся в пуле (arena), а в массиве (ColumnVector) хранятся указатели на них. + * Состояния агрегатных функций хранятся в пуле (arena), + * (возможно, в нескольких) + * а в массиве (ColumnVector) хранятся указатели на них. * Столбец захватывает владение пулом и всеми агрегатными функциями, * которые в него переданы (уничтожает их в дестркуторе). */ class ColumnAggregateFunction : public ColumnVector { private: + typedef SharedPtr ArenaPtr; + typedef std::vector Arenas; + const AggregateFunctionPtr func; - SharedPtr arena; + Arenas arenas; public: - ColumnAggregateFunction(AggregateFunctionPtr & func_, SharedPtr & arena_) + ColumnAggregateFunction(AggregateFunctionPtr & func_) { - set(func_, arena_); + set(func_); } - void set(AggregateFunctionPtr & func_, SharedPtr & arena_) + void set(AggregateFunctionPtr & func_) { func = func_; - arena = arena_; + } + + /// Захватить владение ареной. + void addArena(ArenaPtr & arena_) + { + arenas.push_back(arena_); } ~ColumnAggregateFunction() diff --git a/dbms/include/DB/DataTypes/FieldToDataType.h b/dbms/include/DB/DataTypes/FieldToDataType.h index 36f7e200fa1..65c02259a27 100644 --- a/dbms/include/DB/DataTypes/FieldToDataType.h +++ b/dbms/include/DB/DataTypes/FieldToDataType.h @@ -44,11 +44,6 @@ public: return new DataTypeString; } - DataTypePtr operator() (const AggregateFunctionPlainPtr & x) const - { - throw Exception("Cannot get DataType for AggregateFunction Field", ErrorCodes::NOT_IMPLEMENTED); - } - DataTypePtr operator() (const Array & x) const { return new DataTypeArray(apply_visitor(FieldToDataType(), x.at(0))); diff --git a/dbms/include/DB/Interpreters/AggregationCommon.h b/dbms/include/DB/Interpreters/AggregationCommon.h index 0f7769893c8..104d06d1d88 100644 --- a/dbms/include/DB/Interpreters/AggregationCommon.h +++ b/dbms/include/DB/Interpreters/AggregationCommon.h @@ -44,35 +44,6 @@ struct StringHash }; -/** Преобразование значения в 64 бита. Для чисел - однозначное, для строк - некриптографический хэш. */ -class FieldVisitorToUInt64 : public StaticVisitor -{ -public: - FieldVisitorToUInt64() {} - - UInt64 operator() (const Null & x) const { return 0; } - UInt64 operator() (const UInt64 & x) const { return x; } - UInt64 operator() (const Int64 & x) const { return x; } - - UInt64 operator() (const Float64 & x) const - { - UInt64 res = 0; - memcpy(reinterpret_cast(&res), reinterpret_cast(&x), sizeof(x)); - return res; - } - - UInt64 operator() (const String & x) const - { - return CityHash64(x.data(), x.size()); - } - - UInt64 operator() (const Array & x) const - { - throw Exception("Cannot aggregate by array", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION); - } -}; - - typedef std::vector Sizes; diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 2ac204a49c3..5c2a05e69ef 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -49,6 +49,9 @@ typedef HashMap, UInt128Hash, UInt128Z struct AggregatedDataVariants { + /// Пул для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction. + SharedPtr aggregates_pool; + /// Наиболее общий вариант. Самый медленный. На данный момент, не используется. AggregatedData generic; @@ -80,7 +83,7 @@ struct AggregatedDataVariants }; Type type; - AggregatedDataVariants() : type(EMPTY) {} + AggregatedDataVariants() : aggregates_pool(new Arena), without_key(NULL), type(EMPTY) {} bool empty() const { return type == EMPTY; } size_t size() const @@ -111,7 +114,8 @@ class Aggregator public: Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_, size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW) - : keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), initialized(false), + : keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), + total_size_of_aggregate_states(0), initialized(false), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), log(&Logger::get("Aggregator")) { @@ -119,7 +123,8 @@ public: Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_, size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW) - : key_names(key_names_), aggregates(aggregates_), keys_size(key_names.size()), aggregates_size(aggregates.size()), initialized(false), + : key_names(key_names_), aggregates(aggregates_), keys_size(key_names.size()), aggregates_size(aggregates.size()), + total_size_of_aggregate_states(0), initialized(false), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), log(&Logger::get("Aggregator")) { @@ -146,9 +151,14 @@ private: ColumnNumbers keys; Names key_names; AggregateDescriptions aggregates; + std::vector aggregate_functions; size_t keys_size; size_t aggregates_size; + Sizes sizes_of_aggregate_states; /// Размеры состояний агрегатных функций - для выделения памяти для них в пуле. + Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций. + size_t total_size_of_aggregate_states; /// Суммарный размер строки из агрегатных функций. + /// Для инициализации от первого блока при конкуррентном использовании. bool initialized; Poco::FastMutex mutex; diff --git a/dbms/src/Core/tests/field.cpp b/dbms/src/Core/tests/field.cpp index f614faad4e8..18d5eb2c86f 100644 --- a/dbms/src/Core/tests/field.cpp +++ b/dbms/src/Core/tests/field.cpp @@ -26,9 +26,6 @@ int main(int argc, char ** argv) field = DB::Null(); std::cerr << DB::apply_visitor(to_string, field) << std::endl; - field = DB::Field::AggregateFunctionPlainPtr(NULL); - std::cerr << DB::apply_visitor(to_string, field) << std::endl; - DB::Field field2; field2 = field; std::cerr << DB::apply_visitor(to_string, field2) << std::endl; @@ -37,7 +34,6 @@ int main(int argc, char ** argv) array.push_back(DB::UInt64(123)); array.push_back(DB::Int64(-123)); array.push_back(DB::String("Hello")); - array.push_back(DB::Field::AggregateFunctionPlainPtr(NULL)); field = array; std::cerr << DB::apply_visitor(to_string, field) << std::endl; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 8894ab806b8..a3565c612aa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -37,17 +37,21 @@ void Aggregator::initialize(Block & block) for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt) it->arguments.push_back(block.getPositionByName(*jt)); + aggregate_functions.resize(aggregates_size); + for (size_t i = 0; i < aggregates_size; ++i) + aggregate_functions[i] = *aggregates[i].function; + /// Создадим пример блока, описывающего результат if (!sample) { - for (size_t i = 0, size = keys_size; i < size; ++i) + for (size_t i = 0; i < keys_size; ++i) { sample.insert(block.getByPosition(keys[i]).cloneEmpty()); if (sample.getByPosition(i).column->isConst()) sample.getByPosition(i).column = dynamic_cast(*sample.getByPosition(i).column).convertToFullColumn(); } - for (size_t i = 0, size = aggregates_size; i < size; ++i) + for (size_t i = 0; i < aggregates_size; ++i) { ColumnWithNameAndType col; col.name = aggregates[i].column_name; @@ -58,7 +62,7 @@ void Aggregator::initialize(Block & block) argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type; col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types); - col.column = new ColumnAggregateFunction; + col.column = new ColumnAggregateFunction(aggregates[i].function); sample.insert(col); } @@ -68,7 +72,19 @@ void Aggregator::initialize(Block & block) for (size_t i = 0; i < columns; ++i) if (block.getByPosition(i).column->isConst()) sample.insert(block.getByPosition(i).cloneEmpty()); - } + + /// Инициализируем размеры состояний и смещения для агрегатных функций. + sizes_of_aggregate_states.resize(aggregates_size); + offsets_of_aggregate_states.resize(aggregates_size); + total_size_of_aggregate_states = 0; + + for (size_t i = 0; i < aggregates_size; ++i) + { + sizes_of_aggregate_states[i] = aggregates[i].function->sizeOfData(); + offsets_of_aggregate_states[i] = current_offset; + total_size_of_aggregate_states += sizes_of_aggregate_states[i]; + } + } } @@ -132,7 +148,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re bool no_more_keys = false; LOG_TRACE(log, "Aggregating"); - + Stopwatch watch; /// Читаем все данные @@ -168,17 +184,18 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (result.type == AggregatedDataVariants::WITHOUT_KEY) { AggregatedDataWithoutKey & res = result.without_key; - if (res.empty()) + if (!res) { - res.resize(aggregates_size); + res = result.aggregates_pool->alloc(total_size_of_aggregate_states); + for (size_t i = 0; i < aggregates_size; ++i) - res[i] = aggregates[i].function->cloneEmpty(); + aggregates[i].function->create(res + offsets_of_aggregate_states[i]); } /// Оптимизация в случае единственной агрегатной функции count. - AggregateFunctionCount * agg_count = dynamic_cast(res[0]); + AggregateFunctionCount * agg_count = dynamic_cast(aggregate_functions[0]); if (aggregates_size == 1 && agg_count) - agg_count->addDelta(rows); + agg_count->addDelta(res, rows); else { for (size_t i = 0; i < rows; ++i) @@ -189,7 +206,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - res[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(res + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -197,15 +214,13 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re else if (result.type == AggregatedDataVariants::KEY_64) { AggregatedDataWithUInt64Key & res = result.key64; - const FieldVisitorToUInt64 visitor; const IColumn & column = *key_columns[0]; /// Для всех строчек for (size_t i = 0; i < rows; ++i) { /// Строим ключ - Field field = column[i]; - UInt64 key = apply_visitor(visitor, field); + UInt64 key = get(column[i]); AggregatedDataWithUInt64Key::iterator it; bool inserted; @@ -222,10 +237,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (inserted) { - new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size); + it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); for (size_t j = 0; j < aggregates_size; ++j) - it->second[j] = aggregates[j].function->cloneEmpty(); + aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); } /// Добавляем значения @@ -234,7 +249,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - it->second[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -270,10 +285,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (inserted) { it->first.data = result.string_pool.insert(ref.data, ref.size); - new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size); + it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); for (size_t j = 0; j < aggregates_size; ++j) - it->second[j] = aggregates[j].function->cloneEmpty(); + aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); } /// Добавляем значения @@ -282,7 +297,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - it->second[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -313,10 +328,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (inserted) { it->first.data = result.string_pool.insert(ref.data, ref.size); - new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size); + it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); for (size_t j = 0; j < aggregates_size; ++j) - it->second[j] = aggregates[j].function->cloneEmpty(); + aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); } /// Добавляем значения @@ -325,7 +340,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - it->second[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -355,11 +370,12 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (inserted) { - new(&it->second) AggregatedDataHashed::mapped_type(key, AggregateFunctionsPlainPtrs(aggregates_size)); + new(&it->second) AggregatedDataHashed::mapped_type(key, NULL); + it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states); key.resize(keys_size); for (size_t j = 0; j < aggregates_size; ++j) - it->second.second[j] = aggregates[j].function->cloneEmpty(); + aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]); } /// Добавляем значения @@ -368,7 +384,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - it->second.second[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(it->second.second + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -390,11 +406,12 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (no_more_keys) continue; - it = res.insert(std::make_pair(key, AggregateFunctionsPlainPtrs(aggregates_size))).first; + it = res.insert(std::make_pair(key, NULL)).first; + it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); key.resize(keys_size); for (size_t j = 0; j < aggregates_size; ++j) - it->second[j] = aggregates[j].function->cloneEmpty(); + aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); } /// Добавляем значения @@ -403,7 +420,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k) aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]); - it->second[j]->add(aggregate_arguments[j]); + aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]); } } } @@ -461,7 +478,13 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants) for (size_t i = 0; i < aggregates_size; ++i) { - aggregate_columns[i] = &static_cast(*res.getByPosition(i + keys_size).column).getData(); + /// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций. + ColumnAggregateFunction & column_aggregate_func = static_cast(*res.getByPosition(i + keys_size).column); + column_aggregate_func.addArena(data_variants.aggregates_pool); + + // + + aggregate_columns[i] = &column_aggregate_func.getData(); aggregate_columns[i]->resize(rows); } @@ -470,7 +493,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants) AggregatedDataWithoutKey & data = data_variants.without_key; for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[0] = data[i]; + (*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i]; } else if (data_variants.type == AggregatedDataVariants::KEY_64) { @@ -755,16 +778,14 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu else if (result.type == AggregatedDataVariants::KEY_64) { AggregatedDataWithUInt64Key & res = result.key64; - const FieldVisitorToUInt64 visitor; const IColumn & column = *key_columns[0]; /// Для всех строчек for (size_t i = 0; i < rows; ++i) { /// Строим ключ - Field field = column[i]; - UInt64 key = apply_visitor(visitor, field); - + UInt64 key = get(column[i]); + AggregatedDataWithUInt64Key::iterator it; bool inserted; res.emplace(key, it, inserted); diff --git a/dbms/src/Interpreters/Set.cpp b/dbms/src/Interpreters/Set.cpp index 58eb1d90bfe..0319a0ec625 100644 --- a/dbms/src/Interpreters/Set.cpp +++ b/dbms/src/Interpreters/Set.cpp @@ -87,15 +87,13 @@ void Set::create(BlockInputStreamPtr stream) if (type == KEY_64) { SetUInt64 & res = key64; - const FieldVisitorToUInt64 visitor; const IColumn & column = *key_columns[0]; /// Для всех строчек for (size_t i = 0; i < rows; ++i) { /// Строим ключ - Field field = column[i]; - UInt64 key = apply_visitor(visitor, field); + UInt64 key = get(column[i]); res.insert(key); } @@ -297,15 +295,13 @@ void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result, if (type == KEY_64) { const SetUInt64 & set = key64; - const FieldVisitorToUInt64 visitor; const IColumn & column = *key_columns[0]; /// Для всех строчек for (size_t i = 0; i < rows; ++i) { /// Строим ключ - Field field = column[i]; - UInt64 key = apply_visitor(visitor, field); + UInt64 key = get(column[i]); vec_res[i] = negative ^ (set.end() != set.find(key)); } }