diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSum.h b/src/AggregateFunctions/AggregateFunctionDeltaSum.h index 95f97d079fd..e83ff6ea140 100644 --- a/src/AggregateFunctions/AggregateFunctionDeltaSum.h +++ b/src/AggregateFunctions/AggregateFunctionDeltaSum.h @@ -19,9 +19,11 @@ namespace DB template struct AggregationFunctionDeltaSumData { - T sum{}; - T last{}; - T first{}; + T sum = 0; + bool seen_last = false; + T last; + bool seen_first = false; + T first; }; template @@ -50,26 +52,29 @@ public: { const T & value = (*columns[0])[row_num].get(); - if (this->data(place).last < value) { + if (this->data(place).last < value && this->data(place).seen_last) { this->data(place).sum += (value - this->data(place).last); } this->data(place).last = value; + this->data(place).seen_last = true; - if (this->data(place).first == 0) { + if (this->data(place).seen_first == false) { this->data(place).first = value; + this->data(place).seen_first = true; } } void ALWAYS_INLINE merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { - if (this->data(place).last < this->data(rhs).first) { + if ((this->data(place).last < this->data(rhs).first) && this->data(place).seen_last && this->data(place).seen_first) { this->data(place).sum += this->data(rhs).sum + (this->data(rhs).first - this->data(place).last); } else { this->data(place).sum += this->data(rhs).sum; } this->data(place).last = this->data(rhs).last; + this->data(place).first = this->data(rhs).first; } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override @@ -77,6 +82,8 @@ public: writeIntBinary(this->data(place).sum, buf); writeIntBinary(this->data(place).first, buf); writeIntBinary(this->data(place).last, buf); + writePODBinary(this->data(place).seen_first, buf); + writePODBinary(this->data(place).seen_last, buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override @@ -84,6 +91,8 @@ public: readIntBinary(this->data(place).sum, buf); readIntBinary(this->data(place).first, buf); readIntBinary(this->data(place).last, buf); + readPODBinary(this->data(place).seen_first, buf); + readPODBinary(this->data(place).seen_last, buf); } void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override