#pragma once #include #include #include #include #include #include #include #include namespace DB { template struct AggregateFunctionQuantileData { using Sample = ReservoirSampler; Sample sample; /// TODO Add MemoryTracker }; /** Approximately calculates the quantile. * The argument type can only be a numeric type (including date and date-time). * If returns_float = true, the result type is Float64, otherwise - the result type is the same as the argument type. * For dates and date-time, returns_float should be set to false. */ template class AggregateFunctionQuantile final : public IUnaryAggregateFunction, AggregateFunctionQuantile > { private: using Sample = typename AggregateFunctionQuantileData::Sample; double level; DataTypePtr type; public: AggregateFunctionQuantile(double level_ = 0.5) : level(level_) {} String getName() const override { return "quantile"; } DataTypePtr getReturnType() const override { return type; } void setArgument(const DataTypePtr & argument) { if (returns_float) type = std::make_shared(); else type = argument; } void setParameters(const Array & params) override { if (params.size() != 1) throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); level = applyVisitor(FieldVisitorConvertToNumber(), params[0]); } void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const { this->data(place).sample.insert(static_cast &>(column).getData()[row_num]); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { this->data(place).sample.merge(this->data(rhs).sample); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).sample.write(buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).sample.read(buf); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { /// `Sample` can be sorted when a quantile is received, but in this context, you can not think of this as a violation of constancy. Sample & sample = const_cast(this->data(place).sample); if (returns_float) static_cast(to).getData().push_back(sample.quantileInterpolated(level)); else static_cast &>(to).getData().push_back(sample.quantileInterpolated(level)); } }; /** The same, but allows you to calculate several quantiles at once. * For this, takes as parameters several levels. Example: quantiles(0.5, 0.8, 0.9, 0.95)(ConnectTiming). * Returns an array of results. */ template class AggregateFunctionQuantiles final : public IUnaryAggregateFunction, AggregateFunctionQuantiles > { private: using Sample = typename AggregateFunctionQuantileData::Sample; using Levels = std::vector; Levels levels; DataTypePtr type; public: String getName() const override { return "quantiles"; } DataTypePtr getReturnType() const override { return std::make_shared(type); } void setArgument(const DataTypePtr & argument) { if (returns_float) type = std::make_shared(); else type = argument; } void setParameters(const Array & params) override { if (params.empty()) throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); size_t size = params.size(); levels.resize(size); for (size_t i = 0; i < size; ++i) levels[i] = applyVisitor(FieldVisitorConvertToNumber(), params[i]); } void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const { this->data(place).sample.insert(static_cast &>(column).getData()[row_num]); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { this->data(place).sample.merge(this->data(rhs).sample); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).sample.write(buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).sample.read(buf); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { /// `Sample` can be sorted when a quantile is received, but in this context, you can not think of this as a violation of constancy. Sample & sample = const_cast(this->data(place).sample); ColumnArray & arr_to = static_cast(to); ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets(); size_t size = levels.size(); offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size); if (returns_float) { ColumnFloat64::Container_t & data_to = static_cast(arr_to.getData()).getData(); for (size_t i = 0; i < size; ++i) data_to.push_back(sample.quantileInterpolated(levels[i])); } else { typename ColumnVector::Container_t & data_to = static_cast &>(arr_to.getData()).getData(); for (size_t i = 0; i < size; ++i) data_to.push_back(sample.quantileInterpolated(levels[i])); } } }; }