Add adaptive allocator. [#CLICKHOUSE-3084]

This commit is contained in:
Vitaliy Lyudvichenko 2017-07-11 20:43:51 +03:00 committed by alexey-milovidov
parent 03e5bf9471
commit 792faaa2db
7 changed files with 147 additions and 19 deletions

View File

@ -22,16 +22,9 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name,
return res;
}
namespace
{
}
AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
std::cerr << "groupArray " << StackTrace().toString() << "\n";
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -39,7 +32,10 @@ AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name
bool limit_size = false;
UInt64 max_elems = 0;
if (parameters.empty()) {}
if (parameters.empty())
{
// no limit
}
else if (parameters.size() == 1)
{
if (parameters[0].getType() == Field::Types::Int64 || parameters[0].getType() == Field::Types::UInt64)

View File

@ -113,7 +113,8 @@ struct AggregateFunctionGroupArrayDataNumeric2
/// Memory is allocated to several elements immediately so that the state occupies 64 bytes.
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArrayArenaAllocator<T>);
using Array = PODArrayArenaAllocator<T, bytes_in_arena, ArenaAllocatorWithStackMemoty<bytes_in_arena>>;
//using Array = PODArrayArenaAllocator<T, bytes_in_arena, ArenaAllocatorWithStackMemoty<bytes_in_arena>>;
using Array = PODArrayArenaAllocator<T, bytes_in_arena, MixedArenaAllocator<4096>>;
Array value;
};
@ -128,7 +129,7 @@ class AggregateFunctionGroupArrayNumeric2 final
public:
AggregateFunctionGroupArrayNumeric2(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
String getName() const override { return "groupArray"; }
String getName() const override { return "groupArray2"; }
DataTypePtr getReturnType() const override
{
@ -666,7 +667,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(to);
ColumnArray & column_array = static_cast<ColumnArray &>(to);
auto & column_data = column_array.getData();
auto & offsets = column_array.getOffsets();
auto & cur_state = data(place);
@ -744,7 +745,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & array_column = typeid_cast<ColumnArray &>(to);
ColumnArray & array_column = static_cast<ColumnArray &>(to);
auto s = data(place).container->size();
array_column.getOffsets().push_back(s);
array_column.getData().insertRangeFrom(*data(place).container, 0, s);

View File

@ -8,7 +8,7 @@ namespace DB
namespace
{
AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, const DataTypes & argument_types)
AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -34,7 +34,7 @@ AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, c
}
AggregateFunctionPtr createAggregateFunctionQuantiles(const std::string & name, const DataTypes & argument_types)
AggregateFunctionPtr createAggregateFunctionQuantiles(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -0,0 +1,78 @@
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS;
}
Array getAggregateFunctionParametersArray(const ASTPtr & expression_list, const std::string & error_context)
{
const ASTs & parameters = typeid_cast<const ASTExpressionList &>(*expression_list).children;
if (parameters.empty())
throw Exception("Parameters list to aggregate functions cannot be empty", ErrorCodes::BAD_ARGUMENTS);
Array params_row(parameters.size());
for (size_t i = 0; i < parameters.size(); ++i)
{
const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(parameters[i].get());
if (!lit)
{
throw Exception("Parameters to aggregate functions must be literals" + (error_context.empty() ? "" : " (in " + error_context +")"),
ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS);
}
params_row[i] = lit->value;
}
return params_row;
}
void getAggregateFunctionNameAndParametersArray(
const std::string & aggregate_function_name_with_params,
std::string & aggregate_function_name,
Array & aggregate_function_parameters,
const std::string & error_context)
{
if (aggregate_function_name_with_params.back() != ')')
{
aggregate_function_name = aggregate_function_name_with_params;
aggregate_function_parameters = Array();
return;
}
size_t pos = aggregate_function_name_with_params.find('(');
if (pos == std::string::npos || pos + 2 >= aggregate_function_name_with_params.size())
throw Exception(aggregate_function_name_with_params + " doesn't look like aggregate function name in " + error_context + ".",
ErrorCodes::BAD_ARGUMENTS);
aggregate_function_name = aggregate_function_name_with_params.substr(0, pos);
std::string parameters_str = aggregate_function_name_with_params.substr(pos + 1, aggregate_function_name_with_params.size() - pos - 2);
if (aggregate_function_name.empty())
throw Exception(aggregate_function_name_with_params + " doesn't look like aggregate function name in " + error_context + ".",
ErrorCodes::BAD_ARGUMENTS);
ParserExpressionList params_parser(false);
ASTPtr args_ast = parseQuery(params_parser,
parameters_str.data(), parameters_str.data() + parameters_str.size(),
"parameters of aggregate function in " + error_context);
ASTExpressionList & args_list = typeid_cast<ASTExpressionList &>(*args_ast);
if (args_list.children.empty())
throw Exception("Incorrect list of parameters to aggregate function "
+ aggregate_function_name, ErrorCodes::BAD_ARGUMENTS);
aggregate_function_parameters = getAggregateFunctionParametersArray(args_ast);
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
Array getAggregateFunctionParametersArray(const ASTPtr & expression_list, const std::string & error_context = "");
void getAggregateFunctionNameAndParametersArray(
const std::string & aggregate_function_name_with_params,
std::string & aggregate_function_name,
Array & aggregate_function_parameters,
const std::string & error_context);
}

View File

@ -1,5 +1,6 @@
#include <Common/PODArray.h>
#include <Common/Arena.h>
#include <Common/Allocator.h>
namespace DB
@ -9,20 +10,19 @@ namespace DB
/// Used in aggregate functions
struct ArenaAllocator
{
void * alloc(size_t size, Arena * arena)
static void * alloc(size_t size, Arena * arena)
{
return arena->alloc(size);
}
void * realloc(void * buf, size_t old_size, size_t new_size, Arena * arena)
static void * realloc(void * buf, size_t old_size, size_t new_size, Arena * arena)
{
char const * data = reinterpret_cast<char *>(buf);
// Invariant should be maintained: new_size > old_size
if (data + old_size == arena->head->pos)
{
// Consecutive optimization
// Invariant should be maintained: new_size > old_size
arena->allocContinue(new_size - old_size, data);
return reinterpret_cast<void *>(const_cast<char *>(data));
}
@ -32,7 +32,40 @@ struct ArenaAllocator
}
}
void free(void * buf, size_t size) {}
static void free(void * buf, size_t size) {}
};
template <size_t REAL_ALLOCATION_TRESHOLD = 4096, typename TRealAllocator = Allocator<false>, typename TArenaAllocator = ArenaAllocator>
class MixedArenaAllocator : private TRealAllocator
{
public:
void * alloc(size_t size, Arena * arena)
{
return (size < REAL_ALLOCATION_TRESHOLD) ? TArenaAllocator::alloc(size, arena) : TRealAllocator::alloc(size);
}
void * realloc(void * buf, size_t old_size, size_t new_size, Arena * arena)
{
// Invariant should be maintained: new_size > old_size
if (new_size < REAL_ALLOCATION_TRESHOLD)
return TArenaAllocator::realloc(buf, old_size, new_size, arena);
if (old_size >= REAL_ALLOCATION_TRESHOLD)
return TRealAllocator::realloc(buf, old_size, new_size);
void * new_buf = TRealAllocator::alloc(new_size);
memcpy(new_buf, buf, old_size);
return new_buf;
}
void free(void * buf, size_t size)
{
if (size >= REAL_ALLOCATION_TRESHOLD)
TRealAllocator::free(buf, size);
}
};

View File

@ -14,3 +14,5 @@ SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number,
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
DROP TABLE test.numbers_mt;
-- clickhouse-local -q "SELECT arrayReduce('groupArrayState', [['1'], ['22'], ['333']]) FORMAT RowBinary" | clickhouse-local --input-format RowBinary --structure "d AggregateFunction(groupArray2, Array(String))" -q "SELECT groupArray2Merge(d) FROM table"