Added allocatesMemoryInArena() method for aggregate functions.

Fixed runningAccumulate, now it works properly for complex functions.
More accurate threads handling in Aggregator.
This commit is contained in:
Vitaliy Lyudvichenko 2016-09-26 19:50:13 +03:00
parent 7103157b1b
commit a3d72db2aa
8 changed files with 49 additions and 12 deletions

View File

@ -104,7 +104,10 @@ public:
/// Generic implementation, it uses serialized representation as object descriptor.
struct AggreagteFunctionGroupUniqArrayGenericData
{
using Set = HashSetWithSavedHash<StringRef, StringRefHash, HashTableGrower<4>, HashTableAllocatorWithStackMemory<16>>;
static constexpr size_t INIT_ELEMS = 2; /// adjustable
static constexpr size_t ELEM_SIZE = sizeof(HashSetCellWithSavedHash<StringRef, StringRefHash>);
using Set = HashSetWithSavedHash<StringRef, StringRefHash, HashTableGrower<INIT_ELEMS>, HashTableAllocatorWithStackMemory<INIT_ELEMS * ELEM_SIZE>>;
Set value;
};
@ -136,6 +139,11 @@ public:
return std::make_shared<DataTypeArray>(input_data_type->clone());
}
bool allocatesMemoryInArena() const override
{
return true;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
auto & set = this->data(place).value;

View File

@ -93,6 +93,12 @@ public:
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const
{
return false;
}
/// Inserts results into a column.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;

View File

@ -159,7 +159,8 @@ public:
/// Объединить состояние в последней строке с заданным
void insertMergeFrom(const IColumn & src, size_t n)
{
func->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n], &createOrGetArena());
Arena & arena = createOrGetArena();
func->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n], &arena);
}
Arena & createOrGetArena()

View File

@ -12,6 +12,7 @@
#include <DB/Common/SipHash.h>
#include <DB/Common/memcpySmall.h>
namespace DB
{

View File

@ -25,6 +25,7 @@
#include <DB/Common/UnicodeBar.h>
#include <DB/Functions/IFunction.h>
#include <DB/Functions/NumberTraits.h>
#include <DB/Functions/ObjectPool.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <ext/range.hpp>
@ -1242,13 +1243,22 @@ public:
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
auto arena = (agg_func.allocatesMemoryInArena()) ?
arenas_pool.get(0, []{ return new Arena(); }) :
nullptr;
const auto & states = column_with_states->getData();
for (const auto & state_to_add : states)
{
agg_func.merge(place.get(), state_to_add, nullptr); /// Empty arena!
/// Will pass empty arena if agg_func does not allocate memory in arena
agg_func.merge(place.get(), state_to_add, arena.get());
agg_func.insertResultInto(place.get(), result_column);
}
}
private:
ObjectPool<Arena, int> arenas_pool; /// Used only for complex functions
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <map>
#include <memory>
#include <stack>

View File

@ -1174,7 +1174,7 @@ protected:
template <typename Method>
void mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const;
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
template <typename Method>
void convertBlockToTwoLevelImpl(

View File

@ -1480,7 +1480,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
{
/// Все результаты агрегации соединяем с первым.
AggregatedDataVariantsPtr & res = data[0];
@ -1488,9 +1488,6 @@ void NO_INLINE Aggregator::mergeBucketImpl(
{
AggregatedDataVariants & current = *data[i];
/// Select Arena to avoid race conditions
Arena * arena = res->aggregates_pools.at(static_cast<size_t>(bucket) % size).get();
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
@ -1511,7 +1508,16 @@ public:
* которые все либо являются одноуровневыми, либо являются двухуровневыми.
*/
MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
: aggregator(aggregator_), data(data_), final(final_), threads(threads_) {}
: aggregator(aggregator_), data(data_), final(final_), threads(threads_)
{
/// At least we need one arena in first data item per thread
if (!data.empty() && threads > data[0]->aggregates_pools.size())
{
Arenas & first_pool = data[0]->aggregates_pools;
for (size_t j = first_pool.size(); j < threads; j++)
first_pool.emplace_back(std::make_shared<Arena>());
}
}
String getName() const override { return "MergingAndConverting"; }
@ -1653,17 +1659,21 @@ private:
try
{
/// TODO Возможно, поддержать no_more_keys
/// TODO: add no_more_keys support maybe
auto & merged_data = *data[0];
auto method = merged_data.type;
Block block;
/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();
if (false) {}
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num); \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \
block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \
}