From 43825cdf9590e5f50884966c88b2346d60a4e3bf Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 30 Jun 2013 11:38:46 +0000 Subject: [PATCH] dbms: improved performance of finalizing aggregate functions [#CONV-2944]. --- .../AggregateFunctions/AggregateFunctionAny.h | 4 +- .../AggregateFunctionAnyLast.h | 4 +- .../AggregateFunctions/AggregateFunctionAvg.h | 10 +++-- .../AggregateFunctionCount.h | 4 +- .../AggregateFunctionGroupArray.h | 4 +- .../AggregateFunctionQuantile.h | 41 ++++++++++++------- .../AggregateFunctions/AggregateFunctionSum.h | 8 ++-- .../AggregateFunctionUniq.h | 19 +++++---- .../AggregateFunctionsMinMax.h | 4 +- .../AggregateFunctions/IAggregateFunction.h | 4 +- .../FinalizingAggregatedBlockInputStream.h | 9 ++-- 11 files changed, 63 insertions(+), 48 deletions(-) diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionAny.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionAny.h index 4c53a936f77..76c04520f58 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionAny.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionAny.h @@ -69,9 +69,9 @@ public: d.value = tmp; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).value; + to.insert(data(place).value); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionAnyLast.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionAnyLast.h index e31acfebec2..5e6650a4e75 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionAnyLast.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionAnyLast.h @@ -56,9 +56,9 @@ public: type->deserializeBinary(data(place).value, buf); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).value; + to.insert(data(place).value); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionAvg.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionAvg.h index b0582040eac..9f9b424cb9a 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionAvg.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionAvg.h @@ -70,9 +70,10 @@ public: this->data(place).count += tmp_count; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return static_cast(this->data(place).sum) / this->data(place).count; + static_cast(to).getData().push_back( + static_cast(this->data(place).sum) / this->data(place).count); } }; @@ -134,9 +135,10 @@ public: this->data(place).count += tmp_count; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return static_cast(this->data(place).sum) / this->data(place).count; + static_cast(to).getData().push_back( + static_cast(this->data(place).sum) / this->data(place).count); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionCount.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionCount.h index 95f2de2d991..3e63071e921 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionCount.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionCount.h @@ -53,9 +53,9 @@ public: data(place).count += tmp; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).count; + static_cast(to).getData().push_back(data(place).count); } /// Для оптимизации diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h index 4cff7a16f43..8b16abe5798 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h @@ -74,9 +74,9 @@ public: type->deserializeBinary(value[i], buf); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).value; + to.insert(data(place).value); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h index fd5166a8820..fa60df72951 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h @@ -10,6 +10,8 @@ #include +#include + namespace DB { @@ -86,14 +88,15 @@ public: this->data(place).sample.merge(tmp_sample); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { /// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности. + Sample & sample = const_cast(this->data(place).sample); if (returns_float) - return Float64(const_cast(this->data(place).sample).quantileInterpolated(level)); + static_cast(to).getData().push_back(sample.quantileInterpolated(level)); else - return typename NearestFieldType::Type(const_cast(this->data(place).sample).quantileInterpolated(level)); + static_cast &>(to).getData().push_back(sample.quantileInterpolated(level)); } }; @@ -164,23 +167,31 @@ public: this->data(place).sample.merge(tmp_sample); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - size_t size = levels.size(); - Field res = Array(levels.size()); - Array & arr = get(res); - /// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности. Sample & sample = const_cast(this->data(place).sample); - if (returns_float) - for (size_t i = 0; i < size; ++i) - arr[i] = Float64(sample.quantileInterpolated(levels[i])); - else - for (size_t i = 0; i < size; ++i) - arr[i] = typename NearestFieldType::Type(sample.quantileInterpolated(levels[i])); + ColumnArray & arr_to = static_cast(to); + ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets(); - return res; + size_t size = levels.size(); + offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size); + + if (returns_float) + { + ColumnFloat64::Container_t & data_to = static_cast(arr_to.getData()).getData(); + + for (size_t i = 0; i < size; ++i) + data_to.push_back(sample.quantileInterpolated(levels[i])); + } + else + { + typename ColumnVector::Container_t & data_to = static_cast &>(arr_to.getData()).getData(); + + for (size_t i = 0; i < size; ++i) + data_to.push_back(sample.quantileInterpolated(levels[i])); + } } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h index a74ee06a8f5..33b94326422 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h @@ -62,9 +62,9 @@ public: this->data(place).sum += tmp; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return this->data(place).sum; + static_cast::Type> &>(to).getData().push_back(this->data(place).sum); } }; @@ -115,9 +115,9 @@ public: this->data(place).sum += tmp; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return this->data(place).sum; + static_cast::Type> &>(to).getData().push_back(this->data(place).sum); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h index a4e599f55eb..181050398ba 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h @@ -11,6 +11,8 @@ #include #include +#include + #include @@ -90,9 +92,9 @@ public: data(place).set.merge(tmp_set); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).set.size(); + static_cast(to).getData().push_back(data(place).set.size()); } }; @@ -120,12 +122,13 @@ public: return new DataTypeString; } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - Field res = String(); - WriteBufferFromString wb(get(res)); + String res; + WriteBufferFromString wb(res); this->data(place).set.writeText(wb); - return res; + + static_cast(to).insertDataWithTerminatingZero(res.data(), res.size() + 1); } }; @@ -176,9 +179,9 @@ public: data(place).set.merge(tmp_set); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).set.size(); + static_cast(to).getData().push_back(data(place).set.size()); } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h index 026edce47a1..4ed044ccebf 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h @@ -99,9 +99,9 @@ public: type->deserializeBinary(d.value, buf); } - Field getResult(ConstAggregateDataPtr place) const + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - return data(place).value; + to.insert(data(place).value); } }; diff --git a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h index 148e3e0fa6c..675ed118b86 100644 --- a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h +++ b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h @@ -79,8 +79,8 @@ public: /// Десериализовать состояние и объединить своё состояние с ним. virtual void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const = 0; - /// Получить результат - virtual Field getResult(ConstAggregateDataPtr place) const = 0; + /// Вставить результат в столбец. + virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0; }; diff --git a/dbms/include/DB/DataStreams/FinalizingAggregatedBlockInputStream.h b/dbms/include/DB/DataStreams/FinalizingAggregatedBlockInputStream.h index 65972c75ce9..aa408805d5e 100644 --- a/dbms/include/DB/DataStreams/FinalizingAggregatedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/FinalizingAggregatedBlockInputStream.h @@ -56,13 +56,12 @@ protected: ColumnAggregateFunction::Container_t & data = col->getData(); IAggregateFunction * func = col->getFunction(); column.type = func->getReturnType(); - ColumnPtr finalized_column = column.type->createColumn(); - finalized_column->reserve(rows); + column.column = column.type->createColumn(); + IColumn & finalized_column = *column.column; + finalized_column.reserve(rows); for (size_t j = 0; j < rows; ++j) - finalized_column->insert(func->getResult(data[j])); - - column.column = finalized_column; + func->insertResultInto(data[j], finalized_column); } }