Fixed arrayReduce. Refactoring of ObjectPool. [#CLICKHOUSE-2787]

This commit is contained in:
Vitaliy Lyudvichenko 2017-02-01 17:00:12 +03:00 committed by alexey-milovidov
parent 1302c3958a
commit 76916739e2
9 changed files with 82 additions and 70 deletions

View File

@ -15,6 +15,7 @@
#include <DB/Functions/IFunction.h>
#include <DB/Functions/DataTypeTraits.h>
#include <DB/Functions/ObjectPool.h>
#include <DB/Common/StringUtils.h>
#include <ext/range.hpp>
@ -1424,8 +1425,8 @@ private:
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
/** Применяет к массиву агрегатную функцию и возвращает её результат.
* Также может быть применена к нескольким массивам одинаковых размеров, если агрегатная функция принимает несколько аргументов.
/** Applies an aggregate function to array and returns its result.
* If aggregate function has multiple arguments, then this function can be applied to multiple arrays with the same size.
*/
class FunctionArrayReduce : public IFunction
{

View File

@ -1251,15 +1251,13 @@ public:
agg_func.create(place.get()); /// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy.
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
ColumnPtr result_column_ptr = agg_func.getReturnType()->createColumn();
block.safeGetByPosition(result).column = result_column_ptr;
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
auto arena = (agg_func.allocatesMemoryInArena())
? arenas_pool.get(0, []{ return new Arena(); })
: nullptr;
const auto & states = column_with_states->getData();
for (const auto & state_to_add : states)
{
@ -1268,10 +1266,6 @@ public:
agg_func.insertResultInto(place.get(), result_column);
}
}
private:
ObjectPool<Arena, int> arenas_pool; /// Used only for complex functions
};

View File

@ -432,7 +432,7 @@ inline bool likePatternIsStrstr(const String & pattern, String & res)
namespace Regexps
{
using Regexp = OptimizedRegularExpressionImpl<false>;
using Pool = ObjectPool<Regexp, String>;
using Pool = ObjectPoolMap<Regexp, String>;
template <bool like>
inline Regexp createRegexp(const std::string & pattern, int flags) { return {pattern, flags}; }

View File

@ -18,82 +18,86 @@ namespace DB
* than number of running/sleeping threads, that has ever used object,
* and creation/destruction of objects is expensive).
*/
template <typename T, typename Key>
class ObjectPool
template <typename T>
class SimpleObjectPool
{
private:
struct Holder;
struct Deleter;
protected:
/// Hold all avaiable objects in stack.
std::mutex mutex;
std::stack<std::unique_ptr<T>> stack;
/// Specialized deleter for std::unique_ptr.
/// Returns underlying pointer back to stack thus reclaiming its ownership.
struct Deleter
{
SimpleObjectPool<T> * parent;
Deleter(SimpleObjectPool<T> * parent_ = nullptr) : parent{parent_} {}
void operator()(T * owning_ptr) const
{
std::lock_guard<std::mutex> lock{parent->mutex};
parent->stack.emplace(owning_ptr);
}
};
public:
using Pointer = std::unique_ptr<T, Deleter>;
/// Extracts and returns a pointer from the stack if it's not empty,
/// creates a new one by calling provided f() otherwise.
template <typename Factory>
Pointer get(Factory && f)
{
std::unique_lock<std::mutex> lock(mutex);
if (stack.empty())
{
lock.unlock();
return { f(), this };
}
auto object = stack.top().release();
stack.pop();
return { object, this };
}
/// Like get(), but creates object using default constructor.
Pointer getDefault()
{
return get([] { return new T; });
}
};
/// Like SimpleObjectPool, but additionally allows store different kind of objects that are identified by Key
template <typename T, typename Key>
class ObjectPoolMap
{
private:
/// Holds all objects for same key.
struct Holder
{
std::mutex mutex;
std::stack<std::unique_ptr<T>> stack;
/** Extracts and returns a pointer from the collection if it's not empty,
* creates a new one by calling provided f() otherwise.
*/
template <typename Factory>
Pointer get(Factory && f)
{
std::unique_lock<std::mutex> lock(mutex);
if (stack.empty())
{
lock.unlock();
return { f(), this };
}
auto object = stack.top().release();
stack.pop();
return { object, this };
}
};
/** Specialized deleter for std::unique_ptr.
* Returns underlying pointer back to holder thus reclaiming its ownership.
*/
struct Deleter
{
Holder * holder;
Deleter(Holder * holder = nullptr) : holder{holder} {}
void operator()(T * owning_ptr) const
{
std::lock_guard<std::mutex> lock{holder->mutex};
holder->stack.emplace(owning_ptr);
}
};
using Object = SimpleObjectPool<T>;
/// Key -> objects
using Container = std::map<Key, std::unique_ptr<Holder>>;
using Container = std::map<Key, std::unique_ptr<Object>>;
Container container;
std::mutex mutex;
public:
/// f is a function that takes zero arguments (usually captures key from outside scope)
/// and returns plain pointer to new created object.
using Pointer = typename Object::Pointer;
template <typename Factory>
Pointer get(const Key & key, Factory && f)
{
typename Container::iterator it;
std::unique_lock<std::mutex> lock(mutex);
{
std::unique_lock<std::mutex> lock(mutex);
it = container.find(key);
if (container.end() == it)
it = container.emplace(key, std::make_unique<Holder>()).first;
}
auto it = container.find(key);
if (container.end() == it)
it = container.emplace(key, std::make_unique<Object>()).first;
return it->second->get(std::forward<Factory>(f));
}

View File

@ -2743,6 +2743,8 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
std::unique_ptr<char[]> place_holder { new char[agg_func.sizeOfData()] };
AggregateDataPtr place = place_holder.get();
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
size_t rows = block.rows();
/// Агрегатные функции не поддерживают константные столбцы. Поэтому, материализуем их.
@ -2785,7 +2787,7 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
try
{
for (size_t j = current_offset; j < next_offset; ++j)
agg_func.add(place, aggregate_arguments, j, nullptr);
agg_func.add(place, aggregate_arguments, j, arena.get());
agg_func.insertResultInto(place, res_col);
}

View File

@ -55,7 +55,7 @@ private:
};
/// Separate converter is created for each thread.
using Pool = ObjectPool<IConv, CharsetsFromTo>;
using Pool = ObjectPoolMap<IConv, CharsetsFromTo>;
Pool::Pointer getConverter(const CharsetsFromTo & charsets)
{

View File

@ -172,6 +172,10 @@ static void appendGraphitePattern(const Context & context,
throw Exception("Aggregate function is mandatory for retention patterns in GraphiteMergeTree",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (pattern.function->allocatesMemoryInArena())
throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree",
ErrorCodes::NOT_IMPLEMENTED);
/// retention-ы должны идти по убыванию возраста.
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });

View File

@ -18,3 +18,6 @@
1000
1000
1000
3
2 2
65

View File

@ -4,3 +4,7 @@ INSERT INTO test.arena SELECT number % 10 AS k, hex(intDiv(number, 10) % 1000) A
SELECT length(groupUniqArrayIf(d, d != hex(0))) FROM test.arena GROUP BY k;
SELECT length(groupUniqArrayMerge(ds)) FROM (SELECT k, groupUniqArrayState(d) AS ds FROM test.arena GROUP BY k) GROUP BY k;
DROP TABLE IF EXISTS test.arena;
SELECT length(arrayReduce('groupUniqArray', [[1, 2], [1], emptyArrayUInt8(), [1], [1, 2]]));
SELECT min(x), max(x) FROM (SELECT length(arrayReduce('groupUniqArray', [hex(number), hex(number+1), hex(number)])) AS x FROM system.numbers LIMIT 100000);
SELECT sum(length(runningAccumulate(x))) FROM (SELECT groupUniqArrayState(toString(number % 10)) AS x, number FROM (SELECT * FROM system.numbers LIMIT 11) GROUP BY number ORDER BY number);