From f65305878b59f97296b1cecea0ee72ed6c24aae6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 14 May 2020 10:59:14 +0300 Subject: [PATCH] Destructive IAggregateFunction::insertResultInto and ColumnAggregateFunction::convertToValues (#10890) * Destructive IAggregateFunction::insertResultInto and ColumnAggregateFunction::convertToValues * Try fix build. * Try fix build. * Fix build. * Make convertToValues static. * fix build. * Remove const casts. * Added comment. * Fix build. * Fix build. * Add test. * Fix test. --- .../AggregateFunctionAggThrow.cpp | 2 +- .../AggregateFunctionArgMinMax.h | 2 +- .../AggregateFunctionArray.h | 2 +- src/AggregateFunctions/AggregateFunctionAvg.h | 2 +- .../AggregateFunctionBitwise.h | 2 +- .../AggregateFunctionBoundingRatio.h | 2 +- ...egateFunctionCategoricalInformationValue.h | 2 +- .../AggregateFunctionCount.h | 4 ++-- .../AggregateFunctionEntropy.h | 2 +- .../AggregateFunctionForEach.h | 6 ++--- .../AggregateFunctionGroupArray.h | 6 ++--- .../AggregateFunctionGroupArrayInsertAt.h | 2 +- .../AggregateFunctionGroupArrayMoving.h | 2 +- .../AggregateFunctionGroupBitmap.h | 4 ++-- .../AggregateFunctionGroupUniqArray.h | 4 ++-- .../AggregateFunctionHistogram.h | 4 ++-- src/AggregateFunctions/AggregateFunctionIf.h | 2 +- .../AggregateFunctionMLMethod.h | 2 +- .../AggregateFunctionMaxIntersections.h | 4 ++-- .../AggregateFunctionMerge.h | 2 +- .../AggregateFunctionMinMaxAny.h | 2 +- .../AggregateFunctionNothing.h | 2 +- .../AggregateFunctionNull.h | 2 +- .../AggregateFunctionOrFill.h | 2 +- .../AggregateFunctionQuantile.h | 4 ++-- .../AggregateFunctionResample.h | 2 +- .../AggregateFunctionRetention.h | 2 +- .../AggregateFunctionSequenceMatch.h | 6 ++--- .../AggregateFunctionSimpleLinearRegression.h | 2 +- .../AggregateFunctionState.h | 4 ++-- .../AggregateFunctionStatistics.h | 4 ++-- .../AggregateFunctionStatisticsSimple.h | 2 +- src/AggregateFunctions/AggregateFunctionSum.h | 2 +- .../AggregateFunctionSumMap.h | 4 ++-- .../AggregateFunctionTimeSeriesGroupSum.h | 2 +- .../AggregateFunctionTopK.h | 4 ++-- .../AggregateFunctionUniq.h | 4 ++-- .../AggregateFunctionUniqCombined.h | 4 ++-- .../AggregateFunctionUniqUpTo.h | 4 ++-- .../AggregateFunctionWindowFunnel.h | 6 ++--- src/AggregateFunctions/IAggregateFunction.h | 4 +++- src/AggregateFunctions/QuantileTiming.h | 20 ++++++++-------- src/Columns/ColumnAggregateFunction.cpp | 12 +++++++--- src/Columns/ColumnAggregateFunction.h | 8 +++---- src/DataStreams/finalizeBlock.cpp | 5 +++- src/Functions/finalizeAggregation.cpp | 9 ++++---- .../Transforms/TotalsHavingTransform.cpp | 9 ++++++-- .../01246_finalize_aggregation_race.reference | 18 +++++++++++++++ .../01246_finalize_aggregation_race.sql | 23 +++++++++++++++++++ 49 files changed, 144 insertions(+), 86 deletions(-) create mode 100644 tests/queries/0_stateless/01246_finalize_aggregation_race.reference create mode 100644 tests/queries/0_stateless/01246_finalize_aggregation_race.sql diff --git a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp index 2bf00676d77..ea3eb9b1a20 100644 --- a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp +++ b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp @@ -93,7 +93,7 @@ public: buf.read(c); } - void insertResultInto(ConstAggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/AggregateFunctionArgMinMax.h index 88ad9430bf0..9a0c428d75b 100644 --- a/src/AggregateFunctions/AggregateFunctionArgMinMax.h +++ b/src/AggregateFunctions/AggregateFunctionArgMinMax.h @@ -85,7 +85,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { this->data(place).result.insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionArray.h b/src/AggregateFunctions/AggregateFunctionArray.h index cc4d5ffebb2..4fe5e459ae1 100644 --- a/src/AggregateFunctions/AggregateFunctionArray.h +++ b/src/AggregateFunctions/AggregateFunctionArray.h @@ -119,7 +119,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { nested_func->insertResultInto(place, to); } diff --git a/src/AggregateFunctions/AggregateFunctionAvg.h b/src/AggregateFunctions/AggregateFunctionAvg.h index a269dd74ad5..d9ef8647b82 100644 --- a/src/AggregateFunctions/AggregateFunctionAvg.h +++ b/src/AggregateFunctions/AggregateFunctionAvg.h @@ -80,7 +80,7 @@ public: readBinary(this->data(place).denominator, buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).template result()); diff --git a/src/AggregateFunctions/AggregateFunctionBitwise.h b/src/AggregateFunctions/AggregateFunctionBitwise.h index 29afa7db8d5..a4e5f7ddafa 100644 --- a/src/AggregateFunctions/AggregateFunctionBitwise.h +++ b/src/AggregateFunctions/AggregateFunctionBitwise.h @@ -74,7 +74,7 @@ public: readBinary(this->data(place).value, buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast &>(to).getData().push_back(this->data(place).value); } diff --git a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h index 8451d6532f6..81846db4bac 100644 --- a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h +++ b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h @@ -150,7 +150,7 @@ public: data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(getBoundingRatio(data(place))); } diff --git a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h index 25e5c2d1f1a..1c397c26631 100644 --- a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h +++ b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h @@ -118,7 +118,7 @@ public: } void insertResultInto( - ConstAggregateDataPtr place, + AggregateDataPtr place, IColumn & to ) const override { diff --git a/src/AggregateFunctions/AggregateFunctionCount.h b/src/AggregateFunctions/AggregateFunctionCount.h index 3e153d89d01..092ffc6b6cf 100644 --- a/src/AggregateFunctions/AggregateFunctionCount.h +++ b/src/AggregateFunctions/AggregateFunctionCount.h @@ -57,7 +57,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(data(place).count); } @@ -108,7 +108,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(data(place).count); } diff --git a/src/AggregateFunctions/AggregateFunctionEntropy.h b/src/AggregateFunctions/AggregateFunctionEntropy.h index 942de8ffe98..7586cebd8ec 100644 --- a/src/AggregateFunctions/AggregateFunctionEntropy.h +++ b/src/AggregateFunctions/AggregateFunctionEntropy.h @@ -140,7 +140,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & column = assert_cast &>(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionForEach.h b/src/AggregateFunctions/AggregateFunctionForEach.h index 94e60c75382..23a3487de47 100644 --- a/src/AggregateFunctions/AggregateFunctionForEach.h +++ b/src/AggregateFunctions/AggregateFunctionForEach.h @@ -225,15 +225,15 @@ public: } } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - const AggregateFunctionForEachData & state = data(place); + AggregateFunctionForEachData & state = data(place); ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); IColumn & elems_to = arr_to.getData(); - const char * nested_state = state.array_of_aggregate_datas; + char * nested_state = state.array_of_aggregate_datas; for (size_t i = 0; i < state.dynamic_array_size; ++i) { nested_func->insertResultInto(nested_state, elems_to); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.h b/src/AggregateFunctions/AggregateFunctionGroupArray.h index 2d345cff1f7..b76efd9f6c2 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.h @@ -282,7 +282,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { const auto & value = this->data(place).value; size_t size = value.size(); @@ -600,7 +600,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & column_array = assert_cast(to); @@ -815,7 +815,7 @@ public: data(place).last = prev; } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & column_array = assert_cast(to); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h index 395d13f7d34..0eec38c51a7 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h @@ -179,7 +179,7 @@ public: } } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { ColumnArray & to_array = assert_cast(to); IColumn & to_data = to_array.getData(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h index 2c5c56999b2..8f93a7eb25a 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h @@ -158,7 +158,7 @@ public: this->data(place).sum = value.back(); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { const auto & data = this->data(place); size_t size = data.value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h index 56901e28e01..766479cc08d 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h +++ b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h @@ -48,7 +48,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } @@ -113,7 +113,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h index b437bb2d7bf..9dbf2c921c2 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h @@ -102,7 +102,7 @@ public: this->data(place).value.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -241,7 +241,7 @@ public: } } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionHistogram.h b/src/AggregateFunctions/AggregateFunctionHistogram.h index 96ee01652de..8eaa42fdac4 100644 --- a/src/AggregateFunctions/AggregateFunctionHistogram.h +++ b/src/AggregateFunctions/AggregateFunctionHistogram.h @@ -353,9 +353,9 @@ public: this->data(place).read(buf, max_bins); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - auto & data = this->data(const_cast(place)); + auto & data = this->data(place); auto & to_array = assert_cast(to); ColumnArray::Offsets & offsets_to = to_array.getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionIf.h b/src/AggregateFunctions/AggregateFunctionIf.h index e33fb1df53d..bf4f0b24de3 100644 --- a/src/AggregateFunctions/AggregateFunctionIf.h +++ b/src/AggregateFunctions/AggregateFunctionIf.h @@ -95,7 +95,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { nested_func->insertResultInto(place, to); } diff --git a/src/AggregateFunctions/AggregateFunctionMLMethod.h b/src/AggregateFunctions/AggregateFunctionMLMethod.h index bb241074a1e..ce4ef98e0cf 100644 --- a/src/AggregateFunctions/AggregateFunctionMLMethod.h +++ b/src/AggregateFunctions/AggregateFunctionMLMethod.h @@ -389,7 +389,7 @@ public: /** This function is called if aggregate function without State modifier is selected in a query. * Inserts all weights of the model into the column 'to', so user may use such information if needed */ - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { this->data(place).returnWeights(to); } diff --git a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h index 13ed6ae42fe..050c5fd78ea 100644 --- a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h +++ b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h @@ -129,14 +129,14 @@ public: buf.read(reinterpret_cast(value.data()), size * sizeof(value[0])); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { Int64 current_intersections = 0; Int64 max_intersections = 0; PointType position_of_max_intersections = 0; /// const_cast because we will sort the array - auto & array = const_cast::Array &>(this->data(place).value); + auto & array = this->data(place).value; /// Sort by position; for equal position, sort by weight to get deterministic result. std::sort(array.begin(), array.end()); diff --git a/src/AggregateFunctions/AggregateFunctionMerge.h b/src/AggregateFunctions/AggregateFunctionMerge.h index 2e803211e8f..51a3c11118f 100644 --- a/src/AggregateFunctions/AggregateFunctionMerge.h +++ b/src/AggregateFunctions/AggregateFunctionMerge.h @@ -93,7 +93,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { nested_func->insertResultInto(place, to); } diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index e2f204e1c07..69504f7b249 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -746,7 +746,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { this->data(place).insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionNothing.h b/src/AggregateFunctions/AggregateFunctionNothing.h index d9c8f9cea19..511dbbecd38 100644 --- a/src/AggregateFunctions/AggregateFunctionNothing.h +++ b/src/AggregateFunctions/AggregateFunctionNothing.h @@ -67,7 +67,7 @@ public: { } - void insertResultInto(ConstAggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionNull.h b/src/AggregateFunctions/AggregateFunctionNull.h index a0fe96b6f62..e5309e1300a 100644 --- a/src/AggregateFunctions/AggregateFunctionNull.h +++ b/src/AggregateFunctions/AggregateFunctionNull.h @@ -146,7 +146,7 @@ public: } } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { if (result_is_nullable) { diff --git a/src/AggregateFunctions/AggregateFunctionOrFill.h b/src/AggregateFunctions/AggregateFunctionOrFill.h index be085da9c44..1bbf2ea3135 100644 --- a/src/AggregateFunctions/AggregateFunctionOrFill.h +++ b/src/AggregateFunctions/AggregateFunctionOrFill.h @@ -147,7 +147,7 @@ public: } void insertResultInto( - ConstAggregateDataPtr place, + AggregateDataPtr place, IColumn & to) const override { if (place[size_of_data]) diff --git a/src/AggregateFunctions/AggregateFunctionQuantile.h b/src/AggregateFunctions/AggregateFunctionQuantile.h index f85eb15b3ab..cc90a22da81 100644 --- a/src/AggregateFunctions/AggregateFunctionQuantile.h +++ b/src/AggregateFunctions/AggregateFunctionQuantile.h @@ -138,10 +138,10 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { /// const_cast is required because some data structures apply finalizaton (like sorting) for obtain a result. - auto & data = this->data(const_cast(place)); + auto & data = this->data(place); if constexpr (returns_many) { diff --git a/src/AggregateFunctions/AggregateFunctionResample.h b/src/AggregateFunctions/AggregateFunctionResample.h index 0f348899884..49cc312287e 100644 --- a/src/AggregateFunctions/AggregateFunctionResample.h +++ b/src/AggregateFunctions/AggregateFunctionResample.h @@ -173,7 +173,7 @@ public: } void insertResultInto( - ConstAggregateDataPtr place, + AggregateDataPtr place, IColumn & to) const override { auto & col = assert_cast(to); diff --git a/src/AggregateFunctions/AggregateFunctionRetention.h b/src/AggregateFunctions/AggregateFunctionRetention.h index dd75007726e..3a76ba9f055 100644 --- a/src/AggregateFunctions/AggregateFunctionRetention.h +++ b/src/AggregateFunctions/AggregateFunctionRetention.h @@ -123,7 +123,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & data_to = assert_cast(assert_cast(to).getData()).getData(); auto & offsets_to = assert_cast(to).getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h index 5d4a7098951..416786f8fcb 100644 --- a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h +++ b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h @@ -560,9 +560,9 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - const_cast(this->data(place)).sort(); + this->data(place).sort(); const auto & data_ref = this->data(place); @@ -588,7 +588,7 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { const_cast(this->data(place)).sort(); assert_cast(to).getData().push_back(count(place)); diff --git a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h index db4e57c0c6c..d1405172e27 100644 --- a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h +++ b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h @@ -169,7 +169,7 @@ public: } void insertResultInto( - ConstAggregateDataPtr place, + AggregateDataPtr place, IColumn & to ) const override { diff --git a/src/AggregateFunctions/AggregateFunctionState.h b/src/AggregateFunctions/AggregateFunctionState.h index 8879a324827..126d63573af 100644 --- a/src/AggregateFunctions/AggregateFunctionState.h +++ b/src/AggregateFunctions/AggregateFunctionState.h @@ -80,9 +80,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - assert_cast(to).getData().push_back(const_cast(place)); + assert_cast(to).getData().push_back(place); } /// Aggregate function or aggregate function state. diff --git a/src/AggregateFunctions/AggregateFunctionStatistics.h b/src/AggregateFunctions/AggregateFunctionStatistics.h index c691559b3f2..7f6de43f5e1 100644 --- a/src/AggregateFunctions/AggregateFunctionStatistics.h +++ b/src/AggregateFunctions/AggregateFunctionStatistics.h @@ -143,7 +143,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { this->data(place).publish(to); } @@ -395,7 +395,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { this->data(place).publish(to); } diff --git a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h index 185ef3594dd..72c2ce014e4 100644 --- a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h +++ b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h @@ -445,7 +445,7 @@ public: this->data(place).read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { const auto & data = this->data(place); auto & dst = static_cast(to).getData(); diff --git a/src/AggregateFunctions/AggregateFunctionSum.h b/src/AggregateFunctions/AggregateFunctionSum.h index 46b90555c27..e9a6e50d9ef 100644 --- a/src/AggregateFunctions/AggregateFunctionSum.h +++ b/src/AggregateFunctions/AggregateFunctionSum.h @@ -156,7 +156,7 @@ public: this->data(place).read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionSumMap.h b/src/AggregateFunctions/AggregateFunctionSumMap.h index 581f4bba0f9..e2aef611955 100644 --- a/src/AggregateFunctions/AggregateFunctionSumMap.h +++ b/src/AggregateFunctions/AggregateFunctionSumMap.h @@ -242,10 +242,10 @@ public: } } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { // Final step does compaction of keys that have zero values, this mutates the state - auto & merged_maps = this->data(const_cast(place)).merged_maps; + auto & merged_maps = this->data(place).merged_maps; for (auto it = merged_maps.cbegin(); it != merged_maps.cend();) { // Key is not compacted if it has at least one non-zero value diff --git a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h index 7dd453f5e87..ad83324e483 100644 --- a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h +++ b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h @@ -253,7 +253,7 @@ public: void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { const auto & value = this->data(place).result; size_t size = value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionTopK.h b/src/AggregateFunctions/AggregateFunctionTopK.h index dec6baf6ed3..9c5e62bb6d7 100644 --- a/src/AggregateFunctions/AggregateFunctionTopK.h +++ b/src/AggregateFunctions/AggregateFunctionTopK.h @@ -84,7 +84,7 @@ public: set.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -211,7 +211,7 @@ public: this->data(place).value.merge(this->data(rhs).value); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index 9e869435ce0..334e809ebe7 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -240,7 +240,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -294,7 +294,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqCombined.h b/src/AggregateFunctions/AggregateFunctionUniqCombined.h index 44d92b72365..a92caa4a551 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqCombined.h +++ b/src/AggregateFunctions/AggregateFunctionUniqCombined.h @@ -167,7 +167,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -229,7 +229,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h index f16a7cc475e..4c71215141c 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h +++ b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h @@ -180,7 +180,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).size()); } @@ -242,7 +242,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(this->data(place).size()); } diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h index 3a1d2adee4a..726656d1ca8 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h @@ -151,14 +151,14 @@ private: // The level path must be 1---2---3---...---check_events_size, find the max event level that statisfied the path in the sliding window. // If found, returns the max event level, else return 0. // The Algorithm complexity is O(n). - UInt8 getEventLevel(const Data & data) const + UInt8 getEventLevel(Data & data) const { if (data.size() == 0) return 0; if (!strict_order && events_size == 1) return 1; - const_cast(data).sort(); + data.sort(); /// events_timestamp stores the timestamp that latest i-th level event happen withing time window after previous level event. /// timestamp defaults to -1, which unsigned timestamp value never meet @@ -279,7 +279,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { assert_cast(to).getData().push_back(getEventLevel(this->data(place))); } diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 48d31793a5a..ad074feffc5 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -104,7 +104,9 @@ public: } /// Inserts results into a column. - virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0; + /// This method must be called once, from single thread. + /// After this method was called for state, you can't do anything with state but destroy. + virtual void insertResultInto(AggregateDataPtr place, IColumn & to) const = 0; /// Used for machine learning methods. Predict result from trained model. /// Will insert result into `to` column for rows in range [offset, offset + limit). diff --git a/src/AggregateFunctions/QuantileTiming.h b/src/AggregateFunctions/QuantileTiming.h index d7f425ee2d7..2ab8c866615 100644 --- a/src/AggregateFunctions/QuantileTiming.h +++ b/src/AggregateFunctions/QuantileTiming.h @@ -167,7 +167,7 @@ namespace detail buf.readStrict(reinterpret_cast(elems.data()), size * sizeof(elems[0])); } - UInt16 get(double level) const + UInt16 get(double level) { UInt16 quantile = 0; @@ -178,7 +178,7 @@ namespace detail : (elems.size() - 1); /// Sorting an array will not be considered a violation of constancy. - auto & array = const_cast(elems); + auto & array = elems; std::nth_element(array.begin(), array.begin() + n, array.end()); quantile = array[n]; } @@ -187,10 +187,10 @@ namespace detail } template - void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const + void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) { size_t prev_n = 0; - auto & array = const_cast(elems); + auto & array = elems; for (size_t i = 0; i < size; ++i) { auto level_index = levels_permutation[i]; @@ -208,14 +208,14 @@ namespace detail } /// Same, but in the case of an empty state, NaN is returned. - float getFloat(double level) const + float getFloat(double level) { return !elems.empty() ? get(level) : std::numeric_limits::quiet_NaN(); } - void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const + void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) { if (!elems.empty()) getMany(levels, levels_permutation, size, result); @@ -707,7 +707,7 @@ public: } /// Get the value of the `level` quantile. The level must be between 0 and 1. - UInt16 get(double level) const + UInt16 get(double level) { Kind kind = which(); @@ -728,7 +728,7 @@ public: /// Get the size values of the quantiles of the `levels` levels. Record `size` results starting with `result` address. template - void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const + void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) { Kind kind = which(); @@ -748,14 +748,14 @@ public: } /// The same, but in the case of an empty state, NaN is returned. - float getFloat(double level) const + float getFloat(double level) { return tiny.count ? get(level) : std::numeric_limits::quiet_NaN(); } - void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const + void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) { if (tiny.count) getMany(levels, levels_permutation, size, result); diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index e52d3e5303f..2f3a766b8f5 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -39,7 +39,7 @@ void ColumnAggregateFunction::addArena(ConstArenaPtr arena_) foreign_arenas.push_back(arena_); } -MutableColumnPtr ColumnAggregateFunction::convertToValues() const +MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr column) { /** If the aggregate function returns an unfinalized/unfinished state, * then you just need to copy pointers to it and also shared ownership of data. @@ -65,20 +65,26 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues() const * `AggregateFunction(quantileTimingState(0.5), UInt64)` * into `AggregateFunction(quantileTiming(0.5), UInt64)` * - in the same states. - * + *column_aggregate_func * Then `finalizeAggregation` function will be calculated, which will call `convertToValues` already on the result. * And this converts a column of type * AggregateFunction(quantileTiming(0.5), UInt64) * into UInt16 - already finished result of `quantileTiming`. */ + auto & column_aggregate_func = assert_cast(*column); + auto & func = column_aggregate_func.func; + auto & data = column_aggregate_func.data; + if (const AggregateFunctionState *function_state = typeid_cast(func.get())) { - auto res = createView(); + auto res = column_aggregate_func.createView(); res->set(function_state->getNestedFunction()); res->data.assign(data.begin(), data.end()); return res; } + column_aggregate_func.ensureOwnership(); + MutableColumnPtr res = func->getReturnType()->createColumn(); res->reserve(data.size()); diff --git a/src/Columns/ColumnAggregateFunction.h b/src/Columns/ColumnAggregateFunction.h index 4fd75674fe7..f257351a4d0 100644 --- a/src/Columns/ColumnAggregateFunction.h +++ b/src/Columns/ColumnAggregateFunction.h @@ -114,14 +114,14 @@ public: /// Take shared ownership of Arena, that holds memory for states of aggregate functions. void addArena(ConstArenaPtr arena_); - /** Transform column with states of aggregate functions to column with final result values. - */ - MutableColumnPtr convertToValues() const; + /// Transform column with states of aggregate functions to column with final result values. + /// It expects ColumnAggregateFunction as an argument, this column will be destroyed. + /// This method is made static and receive MutableColumnPtr object to explicitly destroy it. + static MutableColumnPtr convertToValues(MutableColumnPtr column); std::string getName() const override { return "AggregateFunction(" + func->getName() + ")"; } const char * getFamilyName() const override { return "AggregateFunction"; } - bool tryFinalizeAggregateFunction(MutableColumnPtr* res_) const; MutableColumnPtr predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const; size_t size() const override diff --git a/src/DataStreams/finalizeBlock.cpp b/src/DataStreams/finalizeBlock.cpp index 50fbaf2bfe1..144f1a28129 100644 --- a/src/DataStreams/finalizeBlock.cpp +++ b/src/DataStreams/finalizeBlock.cpp @@ -17,7 +17,10 @@ namespace DB { current.type = unfinalized_type->getReturnType(); if (current.column) - current.column = typeid_cast(*current.column).convertToValues(); + { + auto mut_column = (*std::move(current.column)).mutate(); + current.column = ColumnAggregateFunction::convertToValues(std::move(mut_column)); + } } } } diff --git a/src/Functions/finalizeAggregation.cpp b/src/Functions/finalizeAggregation.cpp index 70642722a04..66c6268841b 100644 --- a/src/Functions/finalizeAggregation.cpp +++ b/src/Functions/finalizeAggregation.cpp @@ -57,15 +57,16 @@ public: void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override { - const ColumnAggregateFunction * column_with_states - = typeid_cast(&*block.getByPosition(arguments.at(0)).column); - if (!column_with_states) + auto column = block.getByPosition(arguments.at(0)).column; + if (!typeid_cast(column.get())) throw Exception("Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN); - block.getByPosition(result).column = column_with_states->convertToValues(); + /// Column is copied here, because there is no guarantee that we own it. + auto mut_column = (*std::move(column)).mutate(); + block.getByPosition(result).column = ColumnAggregateFunction::convertToValues(std::move(mut_column)); } }; diff --git a/src/Processors/Transforms/TotalsHavingTransform.cpp b/src/Processors/Transforms/TotalsHavingTransform.cpp index 6a21c5aabf0..451415f7133 100644 --- a/src/Processors/Transforms/TotalsHavingTransform.cpp +++ b/src/Processors/Transforms/TotalsHavingTransform.cpp @@ -21,8 +21,13 @@ void finalizeChunk(Chunk & chunk) auto columns = chunk.detachColumns(); for (auto & column : columns) - if (const auto * agg_function = typeid_cast(column.get())) - column = agg_function->convertToValues(); + { + if (typeid_cast(column.get())) + { + auto mut_column = (*std::move(column)).mutate(); + column = ColumnAggregateFunction::convertToValues(std::move(mut_column)); + } + } chunk.setColumns(std::move(columns), num_rows); } diff --git a/tests/queries/0_stateless/01246_finalize_aggregation_race.reference b/tests/queries/0_stateless/01246_finalize_aggregation_race.reference new file mode 100644 index 00000000000..f4d508c87cb --- /dev/null +++ b/tests/queries/0_stateless/01246_finalize_aggregation_race.reference @@ -0,0 +1,18 @@ +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 +200 diff --git a/tests/queries/0_stateless/01246_finalize_aggregation_race.sql b/tests/queries/0_stateless/01246_finalize_aggregation_race.sql new file mode 100644 index 00000000000..336fe6bcfea --- /dev/null +++ b/tests/queries/0_stateless/01246_finalize_aggregation_race.sql @@ -0,0 +1,23 @@ +drop table if exists test_quantile; +create table test_quantile (x AggregateFunction(quantileTiming(0.2), UInt64)) engine = Memory; +insert into test_quantile select medianTimingState(.2)(number) from (select * from numbers(1000) order by number desc); +select y from ( +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile union all +select finalizeAggregation(x) as y from test_quantile) +order by y;