dbms: more read write functions for DataTypeAggregateFunction and ColumnAggregateFunction [METR-10894]

This commit is contained in:
Sergey Fedorov 2014-05-22 18:09:10 +04:00
parent 88aec6b1d8
commit be9527f09f
9 changed files with 128 additions and 58 deletions

View File

@ -83,7 +83,7 @@ public:
{
Field field;
columns[0]->get(row_num, field);
ReadBufferFromString read_buffer(field.safeGet<String>());
ReadBufferFromString read_buffer(field.safeGet<String &>());
nested_func->deserializeMerge(place, read_buffer);
}

View File

@ -101,7 +101,7 @@ public:
}
/// Для аггрегатных функции типа state никогда не нужно вызывать insertResultInto
bool isFinal() const { return false; }
bool canBeFinal() const { return false; }
};

View File

@ -62,6 +62,18 @@ public:
this->data(place).sum += tmp;
}
void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
writeText(this->data(place).sum, buf);
}
void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
typename NearestFieldType<T>::Type tmp;
readText(tmp, buf);
this->data(place).sum += tmp;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnVector<typename NearestFieldType<T>::Type> &>(to).getData().push_back(this->data(place).sum);

View File

@ -116,6 +116,42 @@ public:
}
}
void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
const Data & d = this->data(place);
if (unlikely(d.value.isNull()))
{
writeText(false, buf);
}
else
{
writeText(true, buf);
type->serializeText(this->data(place).value, buf);
}
}
void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
Data & d = this->data(place);
bool is_not_null = false;
readText(is_not_null, buf);
if (is_not_null)
{
if (!d.value.isNull())
{
Field value_;
type->deserializeText(value_, buf);
if (Traits::better(value_, d.value))
d.value = value_;
}
else
type->deserializeText(d.value, buf);
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
if (unlikely(this->data(place).value.isNull()))

View File

@ -82,11 +82,23 @@ public:
/// Десериализовать состояние и объединить своё состояние с ним.
virtual void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const = 0;
/// Сериализовать состояние в текстовом виде (а не в бинарном, как в функции serialize). Нельзя сериализовывать "пустое" состояние.
virtual void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
throw Exception("Method serializeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED);
}
/// Десериализовать текстовое состояние и объединить своё состояние с ним.
virtual void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
throw Exception("Method deserializeMergeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED);
}
/// Вставить результат в столбец.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
/// Можно ли вызывать метод insertResultInto, или всегда нужно запоминать состояние.
virtual bool isFinal() const { return true; }
virtual bool canBeFinal() const { return true; }
};

View File

@ -6,6 +6,9 @@
#include <DB/Columns/ColumnVector.h>
#include <DB/Core/Field.h>
#include <DB/IO/ReadBufferFromString.h>
namespace DB
{
@ -78,31 +81,40 @@ public:
Field operator[](size_t n) const
{
String buffer_string;
WriteBufferFromString buffer(buffer_string);
Field field = String();
{
WriteBufferFromString buffer(field.get<String &>());
func->serialize(data[n], buffer);
return Field(buffer_string);
}
return field;
}
void get(size_t n, Field & res) const
{
String buffer_string;
WriteBufferFromString buffer(buffer_string);
res.assignString("", 0);
{
WriteBufferFromString buffer(res.get<String &>());
func->serialize(data[n], buffer);
res = buffer_string;
}
}
StringRef getDataAt(size_t n) const
{
String buffer_string;
WriteBufferFromString buffer(buffer_string);
func->serialize(data[n], buffer);
return StringRef(buffer_string);
return StringRef(reinterpret_cast<const char *>(&data[n]), sizeof(data[n]));
}
void insert(const Field & x)
{
insertDefault();
ReadBufferFromString read_buffer(x.safeGet<const String &>());
func->deserializeMerge(data[data.size()-1], read_buffer);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
insertDefault();
ReadBuffer read_buffer(const_cast<char *>(pos), length);
func->deserializeMerge(data[data.size()-1], read_buffer);
}
ColumnPtr cut(size_t start, size_t length) const
@ -125,12 +137,6 @@ public:
throw Exception("Method replicate is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
}
void insert(const Field & x)
{
throw Exception("Method insert is not supported for ColumnAggregateFunction. You must access underlying vector directly.",
ErrorCodes::NOT_IMPLEMENTED);
}
void getExtremes(Field & min, Field & max) const
{
throw Exception("Method getExtremes is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -52,7 +52,7 @@ public:
return stream.str();
}
DataTypePtr getReturnType() const {return function->getReturnType(); };
DataTypePtr getReturnType() const { return function->getReturnType(); };
DataTypes getArgumentsDataTypes() const { return argument_types; }
DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); }

View File

@ -15,12 +15,19 @@ using Poco::SharedPtr;
void DataTypeAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Serialization of individual aggregate functions is not supported", ErrorCodes::NOT_IMPLEMENTED);
const String & s = get<const String &>(field);
writeVarUInt(s.size(), ostr);
writeString(s, ostr);
}
void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & istr) const
{
throw Exception("Deserialization of individual aggregate functions is not supported", ErrorCodes::NOT_IMPLEMENTED);
UInt64 size;
readVarUInt(size, istr);
field = String();
String & s = get<String &>(field);
s.resize(size);
istr.readStrict(&s[0], size);
}
void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
@ -66,37 +73,46 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer &
void DataTypeAggregateFunction::serializeText(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT);
writeString(get<const String &>(field), ostr);
}
void DataTypeAggregateFunction::deserializeText(Field & field, ReadBuffer & istr) const
{
throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT);
field.assignString("", 0);
readString(get<String &>(field), istr);
}
void DataTypeAggregateFunction::serializeTextEscaped(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT);
writeEscapedString(get<const String &>(field), ostr);
}
void DataTypeAggregateFunction::deserializeTextEscaped(Field & field, ReadBuffer & istr) const
{
throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT);
field.assignString("", 0);
readEscapedString(get<String &>(field), istr);
}
void DataTypeAggregateFunction::serializeTextQuoted(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT);
writeQuotedString(get<const String &>(field), ostr);
}
void DataTypeAggregateFunction::deserializeTextQuoted(Field & field, ReadBuffer & istr) const
{
throw Exception("Cannot read aggregate function from text.", ErrorCodes::CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT);
field.assignString("", 0);
readQuotedString(get<String &>(field), istr);
}
void DataTypeAggregateFunction::serializeTextJSON(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Cannot write aggregate function as text.", ErrorCodes::CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT);
writeJSONString(get<const String &>(field), ostr);
}
ColumnPtr DataTypeAggregateFunction::createColumn() const

View File

@ -235,33 +235,19 @@ void Aggregator::convertToBlockImpl(
size_t start_row,
bool final) const
{
// if (!final || !aggregate_functions[i]->isFinal()) {
size_t j = start_row;
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
if (!final || !aggregate_functions[i]->isFinal())
if (!final || !aggregate_functions[i]->canBeFinal())
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
/* }
else
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}*/
}
@ -345,7 +331,8 @@ void Aggregator::destroyImpl(
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
for (size_t i = 0; i < aggregates_size; ++i)
if (aggregate_functions[i]->isFinal())
/// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction
if (aggregate_functions[i]->canBeFinal())
{
char * data = Method::getAggregateData(it->second);
@ -543,7 +530,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
for (size_t i = 0; i < aggregates_size; ++i)
{
if (!final || !aggregate_functions[i]->isFinal())
if (!final || !aggregate_functions[i]->canBeFinal())
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
@ -570,7 +557,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
if (!final || !aggregate_functions[i]->isFinal())
if (!final || !aggregate_functions[i]->canBeFinal())
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
@ -787,7 +774,8 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
AggregatedDataWithoutKey & res_data = result.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
if (aggregate_functions[i]->isFinal())
/// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction
if (aggregate_functions[i]->canBeFinal())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}