ClickHouse/src/AggregateFunctions/IAggregateFunction.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

846 lines
33 KiB
C++
Raw Normal View History

2011-09-19 03:40:05 +00:00
#pragma once
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
#include <Core/Field.h>
#include <Interpreters/Context_fwd.h>
2021-10-02 07:13:14 +00:00
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
2022-11-28 15:02:59 +00:00
#include <Core/IResolvedFunction.h>
#include "config.h"
2021-05-31 08:05:40 +00:00
2017-04-16 04:03:14 +00:00
#include <cstddef>
2011-12-19 08:06:31 +00:00
#include <memory>
#include <vector>
2021-04-20 21:22:29 +00:00
#include <type_traits>
2011-12-19 08:06:31 +00:00
2021-05-31 08:05:40 +00:00
namespace llvm
{
class LLVMContext;
class Value;
class IRBuilderBase;
}
2011-09-19 03:40:05 +00:00
namespace DB
{
struct Settings;
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
2011-09-19 03:40:05 +00:00
2017-04-16 04:03:14 +00:00
class Arena;
class ReadBuffer;
class WriteBuffer;
class IColumn;
class IDataType;
2021-02-11 13:29:30 +00:00
class IWindowFunction;
2017-04-16 04:03:14 +00:00
using DataTypePtr = std::shared_ptr<const IDataType>;
2017-04-16 04:03:14 +00:00
using DataTypes = std::vector<DataTypePtr>;
using AggregateDataPtr = char *;
using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
2022-11-30 18:48:09 +00:00
using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
2022-11-28 15:02:59 +00:00
struct AggregateFunctionProperties;
2017-03-09 00:56:38 +00:00
/** Aggregate functions interface.
* Instances of classes with this interface do not contain the data itself for aggregation,
* but contain only metadata (description) of the aggregate function,
* as well as methods for creating, deleting and working with data.
* The data resulting from the aggregation (intermediate computing states) is stored in other objects
* (which can be created in some memory pool),
2017-03-09 00:56:38 +00:00
* and IAggregateFunction is the external interface for manipulating them.
2011-09-19 03:40:05 +00:00
*/
2022-11-28 15:02:59 +00:00
class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunction>, public IResolvedFunction
2011-09-19 03:40:05 +00:00
{
public:
2022-11-28 15:02:59 +00:00
IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
2022-12-23 18:23:01 +00:00
: argument_types(argument_types_)
2022-11-28 15:02:59 +00:00
, parameters(parameters_)
2022-12-23 18:23:01 +00:00
, result_type(result_type_)
2022-11-28 15:02:59 +00:00
{}
2019-02-11 19:26:32 +00:00
2017-03-09 00:56:38 +00:00
/// Get main function name.
2011-09-19 03:40:05 +00:00
virtual String getName() const = 0;
2021-06-06 21:49:55 +00:00
/// Get the data type of internal state. By default it is AggregateFunction(name(params), argument_types...).
virtual DataTypePtr getStateType() const;
/// Same as the above but normalize state types so that variants with the same binary representation will use the same type.
virtual DataTypePtr getNormalizedStateType() const { return getStateType(); }
/// Returns true if two aggregate functions have the same state representation in memory and the same serialization,
/// so state of one aggregate function can be safely used with another.
/// Examples:
/// - quantile(x), quantile(a)(x), quantile(b)(x) - parameter doesn't affect state and used for finalization only
/// - foo(x) and fooIf(x) - If combinator doesn't affect state
/// By default returns true only if functions have exactly the same names, combinators and parameters.
2022-07-21 15:08:42 +00:00
bool haveSameStateRepresentation(const IAggregateFunction & rhs) const;
virtual bool haveSameStateRepresentationImpl(const IAggregateFunction & rhs) const;
virtual const IAggregateFunction & getBaseAggregateFunctionWithSameStateRepresentation() const { return *this; }
bool haveEqualArgumentTypes(const IAggregateFunction & rhs) const;
/// Get type which will be used for prediction result in case if function is an ML method.
virtual DataTypePtr getReturnTypeToPredict() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Prediction is not supported for {}", getName());
}
2011-09-19 03:40:05 +00:00
2021-05-30 13:57:30 +00:00
virtual bool isVersioned() const { return false; }
virtual size_t getVersionFromRevision(size_t /* revision */) const { return 0; }
virtual size_t getDefaultVersion() const { return 0; }
2022-11-28 15:02:59 +00:00
~IAggregateFunction() override = default;
/** Data manipulating functions. */
2017-03-09 04:26:17 +00:00
/** Create empty data for aggregation with `placement new` at the specified location.
* You will have to destroy them using the `destroy` method.
*/
virtual void create(AggregateDataPtr __restrict place) const = 0;
2017-03-09 04:26:17 +00:00
/// Delete data for aggregation.
virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0;
/// Delete all combinator states that were used after combinator -State.
/// For example for uniqArrayStateForEachMap(...) it will destroy
/// states that were created by combinators Map and ForEach.
/// It's needed because ColumnAggregateFunction in the result will be
/// responsible only for destruction of states that were created
/// by aggregate function and all combinators before -State combinator.
virtual void destroyUpToState(AggregateDataPtr __restrict place) const noexcept
{
destroy(place);
}
2017-03-09 00:56:38 +00:00
/// It is not necessary to delete data.
virtual bool hasTrivialDestructor() const = 0;
2017-03-09 04:26:17 +00:00
/// Get `sizeof` of structure with data.
virtual size_t sizeOfData() const = 0;
2020-08-07 00:33:17 +00:00
/// How the data structure should be aligned.
virtual size_t alignOfData() const = 0;
2016-09-22 23:26:08 +00:00
/** Adds a value into aggregation data on which place points to.
* columns points to columns containing arguments of aggregation function.
* row_num is number of row which should be added.
* Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
*/
virtual void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;
2011-09-19 03:40:05 +00:00
/// Adds several default values of arguments into aggregation data on which place points to.
/// Default values must be a the 0-th positions in columns.
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0;
2016-09-22 23:26:08 +00:00
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
2011-09-19 03:40:05 +00:00
/// Tells if merge() with thread pool parameter could be used.
virtual bool isAbleToParallelizeMerge() const { return false; }
/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "merge() with thread pool parameter isn't implemented for {} ", getName());
}
2016-09-22 23:26:08 +00:00
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
2011-09-19 03:40:05 +00:00
2016-09-22 23:26:08 +00:00
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version = std::nullopt, Arena * arena = nullptr) const = 0; /// NOLINT
2011-09-19 03:40:05 +00:00
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const = 0;
2020-12-21 23:47:14 +00:00
/// Inserts results into a column. This method might modify the state (e.g.
/// sort an array), so must be called once, from single thread. The state
/// must remain valid though, and the subsequent calls to add/merge/
/// insertResultInto must work correctly. This kind of call sequence occurs
/// in `runningAccumulate`, or when calculating an aggregate function as a
/// window function.
virtual void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;
2014-05-21 13:27:40 +00:00
/// Special method for aggregate functions with -State combinator, it behaves the same way as insertResultInto,
/// but if we need to insert AggregateData into ColumnAggregateFunction we use special method
/// insertInto that inserts default value and then performs merge with provided AggregateData
/// instead of just copying pointer to this AggregateData. Used in WindowTransform.
virtual void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
if (isState())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Function {} is marked as State but method insertMergeResultInto is not implemented");
insertResultInto(place, to, arena);
}
/// Used for machine learning methods. Predict result from trained model.
/// Will insert result into `to` column for rows in range [offset, offset + limit).
virtual void predictValues(
ConstAggregateDataPtr __restrict /* place */,
IColumn & /*to*/,
const ColumnsWithTypeAndName & /*arguments*/,
size_t /*offset*/,
size_t /*limit*/,
2021-06-01 12:20:52 +00:00
ContextPtr /*context*/) const
2019-04-21 14:32:42 +00:00
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method predictValues is not supported for {}", getName());
2019-04-21 14:32:42 +00:00
}
2019-04-20 23:22:42 +00:00
/** Returns true for aggregate functions of type -State
2017-03-09 00:56:38 +00:00
* They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another).
2022-09-15 12:41:39 +00:00
* Also returns true when the final value of this aggregate function contains State of other aggregate function inside.
2014-06-03 20:29:04 +00:00
*/
virtual bool isState() const { return false; }
2019-11-11 08:36:19 +00:00
/** The inner loop that uses the function pointer is better than using the virtual function.
* The reason is that in the case of virtual functions GCC 5.1.2 generates code,
* which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register.
* This gives a performance drop on simple queries around 12%.
* After the appearance of better compilers, the code can be removed.
*/
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
2019-11-11 08:36:19 +00:00
virtual AddFunc getAddressOfAddFunction() const = 0;
2019-08-10 22:36:55 +00:00
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
2019-11-11 08:36:19 +00:00
* and do a single call to "addBatch" for devirtualization and inlining.
2019-08-10 22:36:55 +00:00
*/
virtual void addBatch( /// NOLINT
size_t row_begin,
size_t row_end,
2020-11-15 10:05:52 +00:00
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
2020-12-22 15:35:07 +00:00
ssize_t if_argument_pos = -1) const = 0;
2019-08-10 22:36:55 +00:00
2021-05-21 00:57:11 +00:00
/// The version of "addBatch", that handle sparse columns as arguments.
2021-03-12 16:33:41 +00:00
virtual void addBatchSparse(
size_t row_begin,
size_t row_end,
2021-03-12 16:33:41 +00:00
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const = 0;
virtual void mergeBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
2021-05-21 00:57:11 +00:00
/// The version of "addBatchSinglePlace", that handle sparse columns as arguments.
2021-03-12 16:33:41 +00:00
virtual void addBatchSparseSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena) const = 0;
2021-03-12 16:33:41 +00:00
/** The same for single place when need to aggregate only filtered data.
* Instead of using an if-column, the condition is combined inside the null_map
*/
virtual void addBatchSinglePlaceNotNull( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
2020-11-15 10:05:52 +00:00
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
2020-12-22 15:35:07 +00:00
ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSinglePlaceFromInterval( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
2020-11-15 10:05:52 +00:00
const = 0;
2020-04-18 09:51:21 +00:00
2019-11-11 08:36:19 +00:00
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
* -Array combinator. It might also be used generally to break data dependency when array
* "places" contains a large number of same values consecutively.
*/
virtual void addBatchArray(
size_t row_begin,
size_t row_end,
2021-04-20 21:22:29 +00:00
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena) const = 0;
/** The case when the aggregation key is UInt8
* and pointers to aggregation states are stored in AggregateDataPtr[256] lookup table.
*/
virtual void addBatchLookupTable8(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const = 0;
2021-07-03 13:29:32 +00:00
/** Insert result of aggregate function into result column with batch size.
* The implementation of this method will destroy aggregate place up to -State if insert state into result column was successful.
2021-07-03 13:29:32 +00:00
* All places that were not inserted must be destroyed if there was exception during insert into result column.
*/
2021-06-30 11:44:45 +00:00
virtual void insertResultIntoBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena) const = 0;
/** Destroy batch of aggregate places.
*/
virtual void destroyBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept = 0;
2020-03-30 21:52:16 +00:00
/** By default all NULLs are skipped during aggregation.
* If it returns nullptr, the default one will be used.
* If an aggregate function wants to use something instead of the default one, it overrides this function and returns its own null adapter.
* nested_function is a smart pointer to this aggregate function itself.
* arguments and params are for nested_function.
*/
2020-06-14 07:44:02 +00:00
virtual AggregateFunctionPtr getOwnNullAdapter(
2021-04-20 21:22:29 +00:00
const AggregateFunctionPtr & /*nested_function*/, const DataTypes & /*arguments*/,
const Array & /*params*/, const AggregateFunctionProperties & /*properties*/) const
{
return nullptr;
}
/// For most functions if one of arguments is always NULL, we return NULL (it's implemented in combinator Null),
/// but in some functions we can want to process this argument somehow (for example condition argument in If combinator).
/// This method returns the set of argument indexes that can be always NULL, they will be skipped in combinator Null.
virtual std::unordered_set<size_t> getArgumentsThatCanBeOnlyNull() const
{
return {};
}
2020-11-15 10:05:52 +00:00
/** Return the nested function if this is an Aggregate Function Combinator.
* Otherwise return nullptr.
*/
virtual AggregateFunctionPtr getNestedFunction() const { return {}; }
2022-11-28 15:02:59 +00:00
const DataTypePtr & getResultType() const override { return result_type; }
const DataTypes & getArgumentTypes() const override { return argument_types; }
const Array & getParameters() const override { return parameters; }
2019-02-11 13:11:52 +00:00
2021-02-11 13:29:30 +00:00
// Any aggregate function can be calculated over a window, but there are some
// window functions such as rank() that require a different interface, e.g.
// because they don't respect the window frame, or need to be notified when
// a new peer group starts. They pretend to be normal aggregate functions,
// but will fail if you actually try to use them in Aggregator. The
// WindowTransform recognizes these functions and handles them differently.
// We could have a separate factory for window functions, and make all
// aggregate functions implement IWindowFunction interface and so on. This
// would be more logically correct, but more complex. We only have a handful
// of true window functions, so this hack-ish interface suffices.
2021-06-06 21:49:55 +00:00
virtual bool isOnlyWindowFunction() const { return false; }
2021-02-11 13:29:30 +00:00
2021-07-03 13:29:32 +00:00
/// Description of AggregateFunction in form of name(parameters)(argument_types).
String getDescription() const;
2021-06-06 15:43:03 +00:00
2021-06-30 11:44:45 +00:00
#if USE_EMBEDDED_COMPILER
2021-05-31 08:05:40 +00:00
2021-06-30 11:44:45 +00:00
/// Is function JIT compilable
2021-05-31 08:05:40 +00:00
virtual bool isCompilable() const { return false; }
2021-06-30 11:44:45 +00:00
/// compileCreate should generate code for initialization of aggregate function state in aggregate_data_ptr
virtual void compileCreate(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/) const
{
2021-07-03 13:29:32 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
2021-06-30 11:44:45 +00:00
/// compileAdd should generate code for updating aggregate function state stored in aggregate_data_ptr
2021-06-04 10:43:11 +00:00
virtual void compileAdd(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/, const DataTypes & /*arguments_types*/, const std::vector<llvm::Value *> & /*arguments_values*/) const
{
2021-07-03 13:29:32 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
2021-06-30 11:44:45 +00:00
/// compileMerge should generate code for merging aggregate function states stored in aggregate_data_dst_ptr and aggregate_data_src_ptr
virtual void compileMerge(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_dst_ptr*/, llvm::Value * /*aggregate_data_src_ptr*/) const
{
2021-07-03 13:29:32 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
2021-06-30 11:44:45 +00:00
/// compileGetResult should generate code for getting result value from aggregate function state stored in aggregate_data_ptr
virtual llvm::Value * compileGetResult(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/) const
2021-05-31 08:05:40 +00:00
{
2021-07-03 13:29:32 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
2021-05-31 08:05:40 +00:00
}
2021-06-30 11:44:45 +00:00
#endif
2021-05-31 08:05:40 +00:00
2019-02-11 19:26:32 +00:00
protected:
2019-02-11 13:11:52 +00:00
DataTypes argument_types;
Array parameters;
2022-12-23 18:23:01 +00:00
DataTypePtr result_type;
};
/// Implement method to obtain an address of 'add' function.
template <typename Derived>
class IAggregateFunctionHelper : public IAggregateFunction
{
private:
static void addFree(const IAggregateFunction * that, AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena)
{
static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
}
2019-11-11 08:36:19 +00:00
public:
2022-11-28 15:02:59 +00:00
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunction(argument_types_, parameters_, result_type_) {}
2019-11-11 08:36:19 +00:00
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addManyDefaults(
AggregateDataPtr __restrict place,
const IColumn ** columns,
size_t length,
Arena * arena) const override
{
for (size_t i = 0; i < length; ++i)
static_cast<const Derived *>(this)->add(place, columns, 0, arena);
}
void addBatch( /// NOLINT
size_t row_begin,
size_t row_end,
2020-11-15 10:05:52 +00:00
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
2020-12-22 15:35:07 +00:00
ssize_t if_argument_pos = -1) const override
{
2020-12-22 15:35:07 +00:00
if (if_argument_pos >= 0)
2020-11-15 10:05:52 +00:00
{
2020-12-22 15:35:07 +00:00
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
{
2021-03-18 09:31:14 +00:00
if (flags[i] && places[i])
2020-11-15 10:05:52 +00:00
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
2021-03-18 09:31:14 +00:00
if (places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
2020-11-15 10:05:52 +00:00
}
}
2019-08-10 22:54:33 +00:00
2021-03-12 16:33:41 +00:00
void addBatchSparse(
size_t row_begin,
size_t row_end,
2021-03-12 16:33:41 +00:00
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
auto offset_it = column_sparse.getIterator(row_begin);
2022-05-28 12:23:05 +00:00
for (size_t i = row_begin; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
&values, offset_it.getValueIndex(), arena);
2021-03-12 16:33:41 +00:00
}
void mergeBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
}
void addBatchSinglePlace( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
2019-08-10 22:54:33 +00:00
{
2020-12-22 15:35:07 +00:00
if (if_argument_pos >= 0)
2020-11-15 10:05:52 +00:00
{
2020-12-22 15:35:07 +00:00
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
2019-08-10 22:54:33 +00:00
}
2021-03-12 16:33:41 +00:00
void addBatchSparseSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena) const override
2021-03-12 16:33:41 +00:00
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets = column_sparse.getOffsetsData();
2021-03-12 16:33:41 +00:00
auto from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin() + 1;
auto to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin() + 1;
size_t num_defaults = (row_end - row_begin) - (to - from);
static_cast<const Derived *>(this)->addBatchSinglePlace(from, to, place, &values, arena, -1);
static_cast<const Derived *>(this)->addManyDefaults(place, &values, num_defaults, arena);
2021-03-12 16:33:41 +00:00
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
2020-11-15 10:05:52 +00:00
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
2020-12-22 15:35:07 +00:00
ssize_t if_argument_pos = -1) const override
{
2020-12-22 15:35:07 +00:00
if (if_argument_pos >= 0)
2020-11-15 10:05:52 +00:00
{
2020-12-22 15:35:07 +00:00
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
if (!null_map[i] && flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceFromInterval( /// NOLINT
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
2020-11-15 10:05:52 +00:00
const override
2020-04-18 09:51:21 +00:00
{
2020-12-22 15:35:07 +00:00
if (if_argument_pos >= 0)
2020-11-15 10:05:52 +00:00
{
2020-12-22 15:35:07 +00:00
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
2020-11-15 10:05:52 +00:00
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
2020-04-18 09:51:21 +00:00
}
2019-11-11 08:36:19 +00:00
void addBatchArray(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
2019-11-11 08:36:19 +00:00
const override
{
size_t current_offset = offsets[static_cast<ssize_t>(row_begin) - 1];
for (size_t i = row_begin; i < row_end; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
2021-03-18 09:31:14 +00:00
if (places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
2020-07-29 23:10:11 +00:00
void addBatchLookupTable8(
size_t row_begin,
size_t row_end,
2020-07-29 23:10:11 +00:00
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const override
{
static constexpr size_t UNROLL_COUNT = 8;
size_t i = row_begin;
2020-07-29 23:10:11 +00:00
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
2020-07-29 23:10:11 +00:00
{
AggregateDataPtr places[UNROLL_COUNT];
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
AggregateDataPtr & place = map[key[i + j]];
if (unlikely(!place))
init(place);
places[j] = place;
}
for (size_t j = 0; j < UNROLL_COUNT; ++j)
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
}
for (; i < row_end; ++i)
2020-07-29 23:10:11 +00:00
{
AggregateDataPtr & place = map[key[i]];
if (unlikely(!place))
init(place);
static_cast<const Derived *>(this)->add(place + place_offset, columns, i, arena);
}
}
void insertResultIntoBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena) const override
{
size_t batch_index = row_begin;
try
{
for (; batch_index < row_end; ++batch_index)
{
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert,
/// so we need to destroy all states up to state of -State combinator.
static_cast<const Derived *>(this)->destroyUpToState(places[batch_index] + place_offset);
}
}
catch (...)
{
for (size_t destroy_index = batch_index; destroy_index < row_end; ++destroy_index)
2021-07-03 13:29:32 +00:00
static_cast<const Derived *>(this)->destroy(places[destroy_index] + place_offset);
throw;
}
}
void destroyBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept override
{
for (size_t i = row_begin; i < row_end; ++i)
{
static_cast<const Derived *>(this)->destroy(places[i] + place_offset);
}
}
};
/// Implements several methods for manipulation with data. T - type of structure with data for aggregation.
template <typename T, typename Derived>
class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived>
{
protected:
using Data = T;
static Data & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const Data *>(place); }
public:
// Derived class can `override` this to flag that DateTime64 is not supported.
static constexpr bool DateTime64Supported = true;
2022-11-28 15:02:59 +00:00
IAggregateFunctionDataHelper(const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionHelper<Derived>(argument_types_, parameters_, result_type_)
{
/// To prevent derived classes changing the destroy() without updating hasTrivialDestructor() to match it
/// Enforce that either both of them are changed or none are
2022-11-28 15:02:59 +00:00
constexpr bool declares_destroy_and_has_trivial_destructor =
std::is_same_v<decltype(&IAggregateFunctionDataHelper::destroy), decltype(&Derived::destroy)> ==
std::is_same_v<decltype(&IAggregateFunctionDataHelper::hasTrivialDestructor), decltype(&Derived::hasTrivialDestructor)>;
2022-11-28 15:02:59 +00:00
static_assert(declares_destroy_and_has_trivial_destructor,
"destroy() and hasTrivialDestructor() methods of an aggregate function must be either both overridden or not");
}
2021-04-20 21:22:29 +00:00
void create(AggregateDataPtr __restrict place) const override /// NOLINT
{
2021-04-20 21:22:29 +00:00
new (place) Data;
}
2021-04-20 21:23:39 +00:00
void destroy(AggregateDataPtr __restrict place) const noexcept override
{
data(place).~Data();
}
2021-04-20 21:22:29 +00:00
bool hasTrivialDestructor() const override
{
return std::is_trivially_destructible_v<Data>;
}
2021-04-20 21:22:29 +00:00
size_t sizeOfData() const override
{
return sizeof(Data);
}
2021-04-20 21:22:29 +00:00
size_t alignOfData() const override
{
return alignof(Data);
}
2020-07-30 01:27:49 +00:00
void addBatchLookupTable8(
size_t row_begin,
size_t row_end,
2020-07-30 01:27:49 +00:00
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const override
{
const Derived & func = *static_cast<const Derived *>(this);
2020-07-30 01:35:50 +00:00
/// If the function is complex or too large, use more generic algorithm.
2020-08-07 00:35:13 +00:00
if (func.allocatesMemoryInArena() || sizeof(Data) > 16 || func.sizeOfData() != sizeof(Data))
2020-07-30 01:27:49 +00:00
{
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(row_begin, row_end, map, place_offset, init, key, columns, arena);
2020-07-30 01:27:49 +00:00
return;
}
2020-07-30 01:35:50 +00:00
/// Will use UNROLL_COUNT number of lookup tables.
2020-07-30 22:00:01 +00:00
static constexpr size_t UNROLL_COUNT = 4;
2020-07-30 01:27:49 +00:00
2020-08-07 19:06:31 +00:00
std::unique_ptr<Data[]> places{new Data[256 * UNROLL_COUNT]};
2020-07-30 01:35:50 +00:00
bool has_data[256 * UNROLL_COUNT]{}; /// Separate flags array to avoid heavy initialization.
2020-07-30 01:27:49 +00:00
size_t i = row_begin;
2020-07-30 01:27:49 +00:00
2020-07-30 01:35:50 +00:00
/// Aggregate data into different lookup tables.
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
2020-07-30 01:27:49 +00:00
{
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
size_t idx = j * 256 + key[i + j];
if (unlikely(!has_data[idx]))
{
new (&places[idx]) Data;
has_data[idx] = true;
}
func.add(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
}
}
2020-07-30 01:35:50 +00:00
/// Merge data from every lookup table to the final destination.
2020-07-30 01:27:49 +00:00
for (size_t k = 0; k < 256; ++k)
{
2020-07-30 01:35:50 +00:00
for (size_t j = 0; j < UNROLL_COUNT; ++j)
2020-07-30 01:27:49 +00:00
{
size_t idx = j * 256 + k;
if (has_data[idx])
2020-07-30 01:27:49 +00:00
{
2020-07-30 01:35:50 +00:00
AggregateDataPtr & place = map[k];
if (unlikely(!place))
init(place);
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[idx]), nullptr);
2020-07-30 01:27:49 +00:00
}
}
}
2020-07-30 01:35:50 +00:00
/// Process tails and add directly to the final destination.
2020-07-30 01:27:49 +00:00
for (; i < row_end; ++i)
2020-07-30 01:27:49 +00:00
{
2020-07-30 01:35:50 +00:00
size_t k = key[i];
AggregateDataPtr & place = map[k];
if (unlikely(!place))
init(place);
2020-07-30 01:27:49 +00:00
2020-07-30 01:35:50 +00:00
func.add(place + place_offset, columns, i, nullptr);
2020-07-30 01:27:49 +00:00
}
}
2011-09-19 03:40:05 +00:00
};
2020-06-14 07:44:02 +00:00
/// Properties of aggregate function that are independent of argument types and parameters.
struct AggregateFunctionProperties
{
/** When the function is wrapped with Null combinator,
* should we return Nullable type with NULL when no values were aggregated
* or we should return non-Nullable type with default value (example: count, countDistinct).
*/
bool returns_default_when_only_null = false;
/** Result varies depending on the data order (example: groupArray).
* Some may also name this property as "non-commutative".
*/
bool is_order_dependent = false;
2022-12-16 18:09:42 +00:00
/// Indicates if it's actually window function.
bool is_window_function = false;
2020-06-14 07:44:02 +00:00
};
2011-09-19 03:40:05 +00:00
}