From be9527f09f82b514adc8f66c64a5b6b98ad36fd8 Mon Sep 17 00:00:00 2001 From: Sergey Fedorov Date: Thu, 22 May 2014 18:09:10 +0400 Subject: [PATCH] dbms: more read write functions for DataTypeAggregateFunction and ColumnAggregateFunction [METR-10894] --- .../AggregateFunctionMerge.h | 2 +- .../AggregateFunctionState.h | 2 +- .../AggregateFunctions/AggregateFunctionSum.h | 12 +++++ .../AggregateFunctionsMinMax.h | 36 +++++++++++++++ .../AggregateFunctions/IAggregateFunction.h | 14 +++++- .../DB/Columns/ColumnAggregateFunction.h | 44 +++++++++++-------- .../DB/DataTypes/DataTypeAggregateFunction.h | 2 +- .../DataTypes/DataTypeAggregateFunction.cpp | 34 ++++++++++---- dbms/src/Interpreters/Aggregator.cpp | 40 ++++++----------- 9 files changed, 128 insertions(+), 58 deletions(-) diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h index c89b906959e..96696f5ae82 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h @@ -83,7 +83,7 @@ public: { Field field; columns[0]->get(row_num, field); - ReadBufferFromString read_buffer(field.safeGet()); + ReadBufferFromString read_buffer(field.safeGet()); nested_func->deserializeMerge(place, read_buffer); } diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h index 2ab1b9afdf9..b5204ca959a 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h @@ -101,7 +101,7 @@ public: } /// Для аггрегатных функции типа state никогда не нужно вызывать insertResultInto - bool isFinal() const { return false; } + bool canBeFinal() const { return false; } }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h index 15f5db479ab..e591ab57be7 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionSum.h @@ -62,6 +62,18 @@ public: this->data(place).sum += tmp; } + void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const + { + writeText(this->data(place).sum, buf); + } + + void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const + { + typename NearestFieldType::Type tmp; + readText(tmp, buf); + this->data(place).sum += tmp; + } + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { static_cast::Type> &>(to).getData().push_back(this->data(place).sum); diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h index 4c56ef00e59..794c551dafc 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMax.h @@ -116,6 +116,42 @@ public: } } + void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const + { + const Data & d = this->data(place); + + if (unlikely(d.value.isNull())) + { + writeText(false, buf); + } + else + { + writeText(true, buf); + type->serializeText(this->data(place).value, buf); + } + } + + void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const + { + Data & d = this->data(place); + + bool is_not_null = false; + readText(is_not_null, buf); + + if (is_not_null) + { + if (!d.value.isNull()) + { + Field value_; + type->deserializeText(value_, buf); + if (Traits::better(value_, d.value)) + d.value = value_; + } + else + type->deserializeText(d.value, buf); + } + } + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { if (unlikely(this->data(place).value.isNull())) diff --git a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h index c94b3d6addb..bed901d187d 100644 --- a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h +++ b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h @@ -82,11 +82,23 @@ public: /// Десериализовать состояние и объединить своё состояние с ним. virtual void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const = 0; + /// Сериализовать состояние в текстовом виде (а не в бинарном, как в функции serialize). Нельзя сериализовывать "пустое" состояние. + virtual void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const + { + throw Exception("Method serializeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED); + } + + /// Десериализовать текстовое состояние и объединить своё состояние с ним. + virtual void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const + { + throw Exception("Method deserializeMergeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED); + } + /// Вставить результат в столбец. virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0; /// Можно ли вызывать метод insertResultInto, или всегда нужно запоминать состояние. - virtual bool isFinal() const { return true; } + virtual bool canBeFinal() const { return true; } }; diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index f7a93755cbf..7aa60c49226 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -6,6 +6,9 @@ #include +#include + +#include namespace DB { @@ -78,31 +81,40 @@ public: Field operator[](size_t n) const { - String buffer_string; - WriteBufferFromString buffer(buffer_string); - func->serialize(data[n], buffer); - return Field(buffer_string); + Field field = String(); + { + WriteBufferFromString buffer(field.get()); + func->serialize(data[n], buffer); + } + return field; } void get(size_t n, Field & res) const { - String buffer_string; - WriteBufferFromString buffer(buffer_string); - func->serialize(data[n], buffer); - res = buffer_string; + res.assignString("", 0); + { + WriteBufferFromString buffer(res.get()); + func->serialize(data[n], buffer); + } } StringRef getDataAt(size_t n) const { - String buffer_string; - WriteBufferFromString buffer(buffer_string); - func->serialize(data[n], buffer); - return StringRef(buffer_string); + return StringRef(reinterpret_cast(&data[n]), sizeof(data[n])); + } + + void insert(const Field & x) + { + insertDefault(); + ReadBufferFromString read_buffer(x.safeGet()); + func->deserializeMerge(data[data.size()-1], read_buffer); } void insertData(const char * pos, size_t length) { - throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + insertDefault(); + ReadBuffer read_buffer(const_cast(pos), length); + func->deserializeMerge(data[data.size()-1], read_buffer); } ColumnPtr cut(size_t start, size_t length) const @@ -125,12 +137,6 @@ public: throw Exception("Method replicate is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED); } - void insert(const Field & x) - { - throw Exception("Method insert is not supported for ColumnAggregateFunction. You must access underlying vector directly.", - ErrorCodes::NOT_IMPLEMENTED); - } - void getExtremes(Field & min, Field & max) const { throw Exception("Method getExtremes is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h index 2f244ce7d29..7cc1422d626 100644 --- a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h @@ -52,7 +52,7 @@ public: return stream.str(); } - DataTypePtr getReturnType() const {return function->getReturnType(); }; + DataTypePtr getReturnType() const { return function->getReturnType(); }; DataTypes getArgumentsDataTypes() const { return argument_types; } DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); } diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 0f21987d176..96b8968f392 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -15,12 +15,19 @@ using Poco::SharedPtr; void DataTypeAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const { - throw Exception("Serialization of individual aggregate functions is not supported", ErrorCodes::NOT_IMPLEMENTED); + const String & s = get(field); + writeVarUInt(s.size(), ostr); + writeString(s, ostr); } void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & istr) const { - throw Exception("Deserialization of individual aggregate functions is not supported", ErrorCodes::NOT_IMPLEMENTED); + UInt64 size; + readVarUInt(size, istr); + field = String(); + String & s = get(field); + s.resize(size); + istr.readStrict(&s[0], size); } void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const @@ -66,37 +73,46 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & void DataTypeAggregateFunction::serializeText(const Field & field, WriteBuffer & ostr) const { - throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT); + writeString(get(field), ostr); } + void DataTypeAggregateFunction::deserializeText(Field & field, ReadBuffer & istr) const { - throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT); + field.assignString("", 0); + readString(get(field), istr); } + void DataTypeAggregateFunction::serializeTextEscaped(const Field & field, WriteBuffer & ostr) const { - throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT); + writeEscapedString(get(field), ostr); } + void DataTypeAggregateFunction::deserializeTextEscaped(Field & field, ReadBuffer & istr) const { - throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT); + field.assignString("", 0); + readEscapedString(get(field), istr); } + void DataTypeAggregateFunction::serializeTextQuoted(const Field & field, WriteBuffer & ostr) const { - throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT); + writeQuotedString(get(field), ostr); } + void DataTypeAggregateFunction::deserializeTextQuoted(Field & field, ReadBuffer & istr) const { - throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT); + field.assignString("", 0); + readQuotedString(get(field), istr); } + void DataTypeAggregateFunction::serializeTextJSON(const Field & field, WriteBuffer & ostr) const { - throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT); + writeJSONString(get(field), ostr); } ColumnPtr DataTypeAggregateFunction::createColumn() const diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index ef923294459..688e5672ee3 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -235,33 +235,19 @@ void Aggregator::convertToBlockImpl( size_t start_row, bool final) const { -// if (!final || !aggregate_functions[i]->isFinal()) { - size_t j = start_row; - for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j) - { - method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); - - for (size_t i = 0; i < aggregates_size; ++i) - if (!final || !aggregate_functions[i]->isFinal()) - (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; - else - aggregate_functions[i]->insertResultInto( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], - *final_aggregate_columns[i]); - } -/* } - else + size_t j = start_row; + for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j) { - for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) - { - method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); + method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < aggregates_size; ++i) + if (!final || !aggregate_functions[i]->canBeFinal()) + (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; + else aggregate_functions[i]->insertResultInto( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - }*/ + } } @@ -345,7 +331,8 @@ void Aggregator::destroyImpl( for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) { for (size_t i = 0; i < aggregates_size; ++i) - if (aggregate_functions[i]->isFinal()) + /// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction + if (aggregate_functions[i]->canBeFinal()) { char * data = Method::getAggregateData(it->second); @@ -543,7 +530,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi for (size_t i = 0; i < aggregates_size; ++i) { - if (!final || !aggregate_functions[i]->isFinal()) + if (!final || !aggregate_functions[i]->canBeFinal()) { /// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций. ColumnAggregateFunction & column_aggregate_func = static_cast(*res.getByPosition(i + keys_size).column); @@ -570,7 +557,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi AggregatedDataWithoutKey & data = data_variants.without_key; for (size_t i = 0; i < aggregates_size; ++i) - if (!final || !aggregate_functions[i]->isFinal()) + if (!final || !aggregate_functions[i]->canBeFinal()) (*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i]; else aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); @@ -787,7 +774,8 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) AggregatedDataWithoutKey & res_data = result.without_key; for (size_t i = 0; i < aggregates_size; ++i) - if (aggregate_functions[i]->isFinal()) + /// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction + if (aggregate_functions[i]->canBeFinal()) aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); }