From a3d72db2aafcfa74bc9384569e1269d203a753b3 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Mon, 26 Sep 2016 19:50:13 +0300 Subject: [PATCH] Added allocatesMemoryInArena() method for aggregate functions. Fixed runningAccumulate, now it works properly for complex functions. More accurate threads handling in Aggregator. --- .../AggregateFunctionGroupUniqArray.h | 10 +++++++- .../AggregateFunctions/IAggregateFunction.h | 6 +++++ .../DB/Columns/ColumnAggregateFunction.h | 3 ++- dbms/include/DB/Columns/ColumnString.h | 1 + .../DB/Functions/FunctionsMiscellaneous.h | 12 +++++++++- dbms/include/DB/Functions/ObjectPool.h | 1 + dbms/include/DB/Interpreters/Aggregator.h | 4 ++-- dbms/src/Interpreters/Aggregator.cpp | 24 +++++++++++++------ 8 files changed, 49 insertions(+), 12 deletions(-) diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h index acb904d784b..92c8ec8d602 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h @@ -104,7 +104,10 @@ public: /// Generic implementation, it uses serialized representation as object descriptor. struct AggreagteFunctionGroupUniqArrayGenericData { - using Set = HashSetWithSavedHash, HashTableAllocatorWithStackMemory<16>>; + static constexpr size_t INIT_ELEMS = 2; /// adjustable + static constexpr size_t ELEM_SIZE = sizeof(HashSetCellWithSavedHash); + using Set = HashSetWithSavedHash, HashTableAllocatorWithStackMemory>; + Set value; }; @@ -136,6 +139,11 @@ public: return std::make_shared(input_data_type->clone()); } + bool allocatesMemoryInArena() const override + { + return true; + } + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { auto & set = this->data(place).value; diff --git a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h index 34d5a4daf30..61cbb7e9f55 100644 --- a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h +++ b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h @@ -93,6 +93,12 @@ public: /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0; + /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). + virtual bool allocatesMemoryInArena() const + { + return false; + } + /// Inserts results into a column. virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0; diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index f087a3e980d..f60a6129402 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -159,7 +159,8 @@ public: /// Объединить состояние в последней строке с заданным void insertMergeFrom(const IColumn & src, size_t n) { - func->merge(getData().back(), static_cast(src).getData()[n], &createOrGetArena()); + Arena & arena = createOrGetArena(); + func->merge(getData().back(), static_cast(src).getData()[n], &arena); } Arena & createOrGetArena() diff --git a/dbms/include/DB/Columns/ColumnString.h b/dbms/include/DB/Columns/ColumnString.h index 84b6d3f73b4..b7846f443d6 100644 --- a/dbms/include/DB/Columns/ColumnString.h +++ b/dbms/include/DB/Columns/ColumnString.h @@ -12,6 +12,7 @@ #include #include + namespace DB { diff --git a/dbms/include/DB/Functions/FunctionsMiscellaneous.h b/dbms/include/DB/Functions/FunctionsMiscellaneous.h index 7a07c82045e..dde32298635 100644 --- a/dbms/include/DB/Functions/FunctionsMiscellaneous.h +++ b/dbms/include/DB/Functions/FunctionsMiscellaneous.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -1242,13 +1243,22 @@ public: IColumn & result_column = *result_column_ptr; result_column.reserve(column_with_states->size()); + auto arena = (agg_func.allocatesMemoryInArena()) ? + arenas_pool.get(0, []{ return new Arena(); }) : + nullptr; + const auto & states = column_with_states->getData(); for (const auto & state_to_add : states) { - agg_func.merge(place.get(), state_to_add, nullptr); /// Empty arena! + /// Will pass empty arena if agg_func does not allocate memory in arena + agg_func.merge(place.get(), state_to_add, arena.get()); agg_func.insertResultInto(place.get(), result_column); } } + +private: + + ObjectPool arenas_pool; /// Used only for complex functions }; diff --git a/dbms/include/DB/Functions/ObjectPool.h b/dbms/include/DB/Functions/ObjectPool.h index d6bf82c4062..039822de206 100644 --- a/dbms/include/DB/Functions/ObjectPool.h +++ b/dbms/include/DB/Functions/ObjectPool.h @@ -1,3 +1,4 @@ +#pragma once #include #include #include diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 252033b4bc3..90e2f3771af 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -384,7 +384,7 @@ struct AggregationMethodConcat { /** Исправление, если все ключи - пустые массивы. Для них в хэш-таблицу записывается StringRef нулевой длины, но с ненулевым указателем. * Но при вставке в хэш-таблицу, такой StringRef оказывается равен другому ключу нулевой длины, - * у которого указатель на данные может быть любым мусором и использовать его нельзя. + * у которого указатель на данные может быть любым мусором и использовать его нельзя. */ for (size_t i = 0; i < keys_size; ++i) key_columns[i]->insertDefault(); @@ -1174,7 +1174,7 @@ protected: template void mergeBucketImpl( - ManyAggregatedDataVariants & data, Int32 bucket) const; + ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const; template void convertBlockToTwoLevelImpl( diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 0ff62804c73..d5fd5b8c9bd 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1480,7 +1480,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( template void NO_INLINE Aggregator::mergeBucketImpl( - ManyAggregatedDataVariants & data, Int32 bucket) const + ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const { /// Все результаты агрегации соединяем с первым. AggregatedDataVariantsPtr & res = data[0]; @@ -1488,9 +1488,6 @@ void NO_INLINE Aggregator::mergeBucketImpl( { AggregatedDataVariants & current = *data[i]; - /// Select Arena to avoid race conditions - Arena * arena = res->aggregates_pools.at(static_cast(bucket) % size).get(); - mergeDataImpl( getDataVariant(*res).data.impls[bucket], getDataVariant(current).data.impls[bucket], @@ -1511,7 +1508,16 @@ public: * которые все либо являются одноуровневыми, либо являются двухуровневыми. */ MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_) - : aggregator(aggregator_), data(data_), final(final_), threads(threads_) {} + : aggregator(aggregator_), data(data_), final(final_), threads(threads_) + { + /// At least we need one arena in first data item per thread + if (!data.empty() && threads > data[0]->aggregates_pools.size()) + { + Arenas & first_pool = data[0]->aggregates_pools; + for (size_t j = first_pool.size(); j < threads; j++) + first_pool.emplace_back(std::make_shared()); + } + } String getName() const override { return "MergingAndConverting"; } @@ -1653,17 +1659,21 @@ private: try { - /// TODO Возможно, поддержать no_more_keys + /// TODO: add no_more_keys support maybe auto & merged_data = *data[0]; auto method = merged_data.type; Block block; + /// Select Arena to avoid race conditions + size_t thread_number = static_cast(bucket_num) % threads; + Arena * arena = merged_data.aggregates_pools.at(thread_number).get(); + if (false) {} #define M(NAME) \ else if (method == AggregatedDataVariants::Type::NAME) \ { \ - aggregator.mergeBucketImpl(data, bucket_num); \ + aggregator.mergeBucketImpl(data, bucket_num, arena); \ block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \ }