dbms: Server: Added an optional parameter to runningAccumulate to make it possible to order merging operations within a block according to a key. It is used only for research and testing purposes. [#METR-17276]

This commit is contained in:
Alexey Arno 2015-08-25 19:34:28 +03:00
parent 2c61eacd03
commit e32d02ebb5

View File

@ -964,26 +964,14 @@ private:
*/
class FunctionRunningAccumulate : public IFunction
{
public:
static constexpr auto name = "runningAccumulate";
static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; }
String getName() const override { return name; }
DataTypePtr getReturnType(const DataTypes & arguments) const override
private:
template <typename T>
bool checkType(const IDataType * type) const
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * type = typeid_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!type)
throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getReturnType()->clone();
return typeid_cast<const T *>(type) != nullptr;
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
void executeWithoutKey(Block & block, const ColumnNumbers & arguments, size_t result)
{
const ColumnAggregateFunction * column_with_states = typeid_cast<const ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
if (!column_with_states)
@ -1011,8 +999,120 @@ public:
agg_func.insertResultInto(place.get(), result_column);
}
}
};
template <typename T>
bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result)
{
using ColumnType = ColumnVector<T>;
if (const auto * column_keys = typeid_cast<const ColumnType *>(&*block.getByPosition(arguments[1]).column))
{
const ColumnAggregateFunction * column_with_states = typeid_cast<const ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
if (!column_with_states)
throw Exception(
"Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction();
const IAggregateFunction & agg_func = *aggregate_function_ptr;
auto deleter = [&agg_func] (char * ptr) { agg_func.destroy(ptr); free(ptr); };
std::unique_ptr<char, decltype(deleter)> place { reinterpret_cast<char *>(malloc(agg_func.sizeOfData())), deleter };
agg_func.create(place.get()); /// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy.
ColumnPtr result_column_ptr = agg_func.getReturnType()->createColumn();
block.getByPosition(result).column = result_column_ptr;
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
const auto & states = column_with_states->getData();
const auto & keys = column_keys->getData();
size_t size = states.size();
using Position = typename std::pair<T, size_t>;
std::vector<Position> positions;
positions.reserve(size);
for (size_t i = 0; i < size; ++i)
positions.push_back(std::make_pair(keys[i], i));
std::sort(positions.begin(), positions.end(), [](const Position & lhs, const Position & rhs)
{
return lhs.first < rhs.first;
});
for (const auto & pos : positions)
{
size_t index = pos.second;
agg_func.merge(place.get(), states[index]);
agg_func.insertResultInto(place.get(), result_column);
}
return true;
}
else
return false;
}
public:
static constexpr auto name = "runningAccumulate";
static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; }
String getName() const override { return name; }
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if ((arguments.size() < 1) || (arguments.size() > 2))
throw Exception("Function " + getName() + " requires 1 or 2 arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() == 2)
{
const IDataType * type = &*arguments[1];
if (! (checkType<DataTypeUInt8>(type)
|| checkType<DataTypeUInt16>(type)
|| checkType<DataTypeUInt32>(type)
|| checkType<DataTypeUInt64>(type)
|| checkType<DataTypeInt8>(type)
|| checkType<DataTypeInt16>(type)
|| checkType<DataTypeInt32>(type)
|| checkType<DataTypeInt64>(type)))
{
throw Exception("Illegal type in second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
const DataTypeAggregateFunction * type = typeid_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!type)
throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getReturnType()->clone();
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
if (arguments.size() == 1)
executeWithoutKey(block, arguments, result);
else if (arguments.size() == 2)
{
if (!( executeForType<UInt8>(block, arguments, result)
|| executeForType<UInt16>(block, arguments, result)
|| executeForType<UInt32>(block, arguments, result)
|| executeForType<UInt64>(block, arguments, result)
|| executeForType<Int8>(block, arguments, result)
|| executeForType<Int16>(block, arguments, result)
|| executeForType<Int32>(block, arguments, result)
|| executeForType<Int64>(block, arguments, result)))
{
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
}
else
throw Exception("Function " + getName() + " requires 1 or 2 arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
};
/** Принимает состояние агрегатной функции. Возвращает результат агрегации.
*/