mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
better code
This commit is contained in:
parent
e4a1ba728d
commit
83677e4e44
@ -1152,6 +1152,95 @@ class FunctionBinaryArithmetic : public IFunction
|
|||||||
&& checkDataType<DataTypeAggregateFunction>(type1.get());
|
&& checkDataType<DataTypeAggregateFunction>(type1.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void executeAggregateMultiply(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
|
||||||
|
{
|
||||||
|
ColumnNumbers new_arguments = arguments;
|
||||||
|
if (checkDataType<DataTypeAggregateFunction>(block.getByPosition(new_arguments[1]).type.get()))
|
||||||
|
std::swap(new_arguments[0], new_arguments[1]);
|
||||||
|
|
||||||
|
const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(block.getByPosition(new_arguments[0]).column.get());
|
||||||
|
IAggregateFunction * function = column->getAggregateFunction().get();
|
||||||
|
|
||||||
|
auto arena = std::make_shared<Arena>();
|
||||||
|
|
||||||
|
auto column_to = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
|
||||||
|
column_to->reserve(input_rows_count);
|
||||||
|
|
||||||
|
auto column_from = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
|
||||||
|
column_from->reserve(input_rows_count);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < input_rows_count; ++i)
|
||||||
|
{
|
||||||
|
column_to->insertDefault();
|
||||||
|
column_from->insertFrom(column->getData()[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto & vec_to = column_to->getData();
|
||||||
|
auto & vec_from = column_from->getData();
|
||||||
|
|
||||||
|
UInt64 m = block.getByPosition(new_arguments[1]).column->getUInt(0);
|
||||||
|
|
||||||
|
/// We use exponentiation by squaring algorithm to perform multiplying aggregate states by N in O(log(N)) operations
|
||||||
|
/// https://en.wikipedia.org/wiki/Exponentiation_by_squaring
|
||||||
|
while (m)
|
||||||
|
{
|
||||||
|
if (m % 2)
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < input_rows_count; ++i)
|
||||||
|
function->merge(vec_to[i], vec_from[i], arena.get());
|
||||||
|
--m;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < input_rows_count; ++i)
|
||||||
|
function->merge(vec_from[i], vec_from[i], arena.get());
|
||||||
|
m /= 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
block.getByPosition(result).column = std::move(column_to);
|
||||||
|
}
|
||||||
|
|
||||||
|
void executeAggregateAddition(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
|
||||||
|
{
|
||||||
|
const ColumnAggregateFunction * columns[2];
|
||||||
|
for (size_t i = 0; i < 2; ++i)
|
||||||
|
columns[i] = typeid_cast<const ColumnAggregateFunction *>(block.getByPosition(arguments[i]).column.get());
|
||||||
|
|
||||||
|
auto arena = std::make_shared<Arena>();
|
||||||
|
auto column_to = ColumnAggregateFunction::create(columns[0]->getAggregateFunction(), Arenas(1, arena));
|
||||||
|
column_to->reserve(input_rows_count);
|
||||||
|
|
||||||
|
for(size_t i = 0; i < input_rows_count; ++i)
|
||||||
|
{
|
||||||
|
column_to->insertFrom(columns[0]->getData()[i]);
|
||||||
|
column_to->insertMergeFrom(columns[1]->getData()[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
block.getByPosition(result).column = std::move(column_to);
|
||||||
|
}
|
||||||
|
|
||||||
|
void executeDateTimeIntervalPlusMinus(Block & block, const ColumnNumbers & arguments,
|
||||||
|
size_t result, size_t input_rows_count, const FunctionBuilderPtr & function_builder) const
|
||||||
|
{
|
||||||
|
ColumnNumbers new_arguments = arguments;
|
||||||
|
|
||||||
|
/// Interval argument must be second.
|
||||||
|
if (checkDataType<DataTypeInterval>(block.getByPosition(arguments[0]).type.get()))
|
||||||
|
std::swap(new_arguments[0], new_arguments[1]);
|
||||||
|
|
||||||
|
/// Change interval argument type to its representation
|
||||||
|
Block new_block = block;
|
||||||
|
new_block.getByPosition(new_arguments[1]).type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
|
||||||
|
|
||||||
|
ColumnsWithTypeAndName new_arguments_with_type_and_name =
|
||||||
|
{new_block.getByPosition(new_arguments[0]), new_block.getByPosition(new_arguments[1])};
|
||||||
|
auto function = function_builder->build(new_arguments_with_type_and_name);
|
||||||
|
|
||||||
|
function->execute(new_block, new_arguments, result, input_rows_count);
|
||||||
|
block.getByPosition(result).column = new_block.getByPosition(result).column;
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static constexpr auto name = Name::name;
|
static constexpr auto name = Name::name;
|
||||||
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionBinaryArithmetic>(context); }
|
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionBinaryArithmetic>(context); }
|
||||||
@ -1185,11 +1274,11 @@ public:
|
|||||||
for (size_t i = 0; i < 2; ++i)
|
for (size_t i = 0; i < 2; ++i)
|
||||||
new_arguments[i] = typeid_cast<const DataTypeAggregateFunction *>(arguments[i].get());
|
new_arguments[i] = typeid_cast<const DataTypeAggregateFunction *>(arguments[i].get());
|
||||||
|
|
||||||
if (new_arguments[0]->getFunctionName() != new_arguments[1]->getFunctionName())
|
if (!new_arguments[0]->equals(*new_arguments[1]))
|
||||||
throw Exception("Cannot add aggregate states of different functions: "
|
throw Exception("Cannot add aggregate states of different functions: "
|
||||||
+ new_arguments[0]->getFunctionName() + " and " + new_arguments[1]->getFunctionName(), ErrorCodes::CANNOT_ADD_DIFFERENT_AGGREGATE_STATES);
|
+ new_arguments[0]->getFunctionName() + " and " + new_arguments[1]->getFunctionName(), ErrorCodes::CANNOT_ADD_DIFFERENT_AGGREGATE_STATES);
|
||||||
|
|
||||||
if (new_arguments[0]->getReturnType()->getName() != new_arguments[1]->getReturnType()->getName())
|
if (!new_arguments[0]->getReturnType()->equals(*new_arguments[1]->getReturnType().get()))
|
||||||
throw Exception("Cannot add aggregate states with different return types: "
|
throw Exception("Cannot add aggregate states with different return types: "
|
||||||
+ new_arguments[0]->getReturnType()->getName() + " and " + new_arguments[1]->getReturnType()->getName(), ErrorCodes::CANNOT_ADD_DIFFERENT_AGGREGATE_STATES);
|
+ new_arguments[0]->getReturnType()->getName() + " and " + new_arguments[1]->getReturnType()->getName(), ErrorCodes::CANNOT_ADD_DIFFERENT_AGGREGATE_STATES);
|
||||||
|
|
||||||
@ -1251,95 +1340,21 @@ public:
|
|||||||
/// Special case when multiply aggregate function state
|
/// Special case when multiply aggregate function state
|
||||||
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
||||||
{
|
{
|
||||||
ColumnNumbers new_arguments = arguments;
|
executeAggregateMultiply(block, arguments, result, input_rows_count);
|
||||||
if (checkDataType<DataTypeAggregateFunction>(block.getByPosition(new_arguments[1]).type.get()))
|
|
||||||
std::swap(new_arguments[0], new_arguments[1]);
|
|
||||||
|
|
||||||
const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(block.getByPosition(new_arguments[0]).column.get());
|
|
||||||
IAggregateFunction * function = column->getAggregateFunction().get();
|
|
||||||
|
|
||||||
auto arena = std::make_shared<Arena>();
|
|
||||||
|
|
||||||
auto column_to = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
|
|
||||||
column_to->reserve(input_rows_count);
|
|
||||||
|
|
||||||
auto column_from = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
|
|
||||||
column_from->reserve(input_rows_count);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < input_rows_count; ++i)
|
|
||||||
{
|
|
||||||
column_to->insertDefault();
|
|
||||||
column_from->insertFrom(column->getData()[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto & vec_to = column_to->getData();
|
|
||||||
auto & vec_from = column_from->getData();
|
|
||||||
|
|
||||||
UInt64 m = block.getByPosition(new_arguments[1]).column->getUInt(0);
|
|
||||||
|
|
||||||
/// We use exponentiation by squaring algorithm to perform multiplying aggregate states by N in O(log(N)) operations
|
|
||||||
/// https://en.wikipedia.org/wiki/Exponentiation_by_squaring
|
|
||||||
while (m)
|
|
||||||
{
|
|
||||||
if (m % 2)
|
|
||||||
{
|
|
||||||
for (size_t i = 0; i < input_rows_count; ++i)
|
|
||||||
function->merge(vec_to[i], vec_from[i], arena.get());
|
|
||||||
--m;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (size_t i = 0; i < input_rows_count; ++i)
|
|
||||||
function->merge(vec_from[i], vec_from[i], arena.get());
|
|
||||||
m /= 2;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
block.getByPosition(result).column = std::move(column_to);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Special case - addition of two aggregate functions states
|
/// Special case - addition of two aggregate functions states
|
||||||
if (isAggregateAddition(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
if (isAggregateAddition(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
||||||
{
|
{
|
||||||
const ColumnAggregateFunction * columns[2];
|
executeAggregateAddition(block, arguments, result, input_rows_count);
|
||||||
for (size_t i = 0; i < 2; ++i)
|
|
||||||
columns[i] = typeid_cast<const ColumnAggregateFunction *>(block.getByPosition(arguments[i]).column.get());
|
|
||||||
|
|
||||||
auto arena = std::make_shared<Arena>();
|
|
||||||
auto column_to = ColumnAggregateFunction::create(columns[0]->getAggregateFunction(), Arenas(1, arena));
|
|
||||||
column_to->reserve(input_rows_count);
|
|
||||||
|
|
||||||
for(size_t i = 0; i < input_rows_count; ++i)
|
|
||||||
{
|
|
||||||
column_to->insertFrom(columns[0]->getData()[i]);
|
|
||||||
column_to->insertMergeFrom(columns[1]->getData()[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
block.getByPosition(result).column = std::move(column_to);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
|
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
|
||||||
if (auto function_builder = getFunctionForIntervalArithmetic(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
if (auto function_builder = getFunctionForIntervalArithmetic(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
||||||
{
|
{
|
||||||
ColumnNumbers new_arguments = arguments;
|
executeDateTimeIntervalPlusMinus(block, arguments, result, input_rows_count, function_builder);
|
||||||
|
|
||||||
/// Interval argument must be second.
|
|
||||||
if (checkDataType<DataTypeInterval>(block.getByPosition(arguments[0]).type.get()))
|
|
||||||
std::swap(new_arguments[0], new_arguments[1]);
|
|
||||||
|
|
||||||
/// Change interval argument type to its representation
|
|
||||||
Block new_block = block;
|
|
||||||
new_block.getByPosition(new_arguments[1]).type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
|
|
||||||
|
|
||||||
ColumnsWithTypeAndName new_arguments_with_type_and_name =
|
|
||||||
{new_block.getByPosition(new_arguments[0]), new_block.getByPosition(new_arguments[1])};
|
|
||||||
auto function = function_builder->build(new_arguments_with_type_and_name);
|
|
||||||
|
|
||||||
function->execute(new_block, new_arguments, result, input_rows_count);
|
|
||||||
block.getByPosition(result).column = new_block.getByPosition(result).column;
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user