record seen first and last

This commit is contained in:
Russ Frank 2021-02-04 00:11:43 -05:00
parent 3913f39211
commit b36d20ef6e

View File

@ -19,9 +19,11 @@ namespace DB
template <typename T>
struct AggregationFunctionDeltaSumData
{
T sum{};
T last{};
T first{};
T sum = 0;
bool seen_last = false;
T last;
bool seen_first = false;
T first;
};
template <typename T>
@ -50,26 +52,29 @@ public:
{
const T & value = (*columns[0])[row_num].get<T>();
if (this->data(place).last < value) {
if (this->data(place).last < value && this->data(place).seen_last) {
this->data(place).sum += (value - this->data(place).last);
}
this->data(place).last = value;
this->data(place).seen_last = true;
if (this->data(place).first == 0) {
if (this->data(place).seen_first == false) {
this->data(place).first = value;
this->data(place).seen_first = true;
}
}
void ALWAYS_INLINE merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
if (this->data(place).last < this->data(rhs).first) {
if ((this->data(place).last < this->data(rhs).first) && this->data(place).seen_last && this->data(place).seen_first) {
this->data(place).sum += this->data(rhs).sum + (this->data(rhs).first - this->data(place).last);
} else {
this->data(place).sum += this->data(rhs).sum;
}
this->data(place).last = this->data(rhs).last;
this->data(place).first = this->data(rhs).first;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -77,6 +82,8 @@ public:
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).last, buf);
writePODBinary<bool>(this->data(place).seen_first, buf);
writePODBinary<bool>(this->data(place).seen_last, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
@ -84,6 +91,8 @@ public:
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).last, buf);
readPODBinary<bool>(this->data(place).seen_first, buf);
readPODBinary<bool>(this->data(place).seen_last, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override