This commit is contained in:
Sergey Fedorov 2014-05-28 18:54:42 +04:00
parent 4fefb5e916
commit 4fe8f37e03
9 changed files with 87 additions and 40 deletions

View File

@ -81,7 +81,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
merge(place, columns[0]->getDataAt(row_num).data);
nested_func->merge(place, columns[0]->getDataAt(row_num).data);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const

View File

@ -5,6 +5,7 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h>
namespace DB
@ -97,7 +98,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
throw Exception("Aggregate function " + getName() + " doesn't support insertResultInto method", ErrorCodes::NOT_IMPLEMENTED);
static_cast<ColumnAggregateFunction &>(to).insertData(place, 0);
}
/// Для аггрегатных функции типа state никогда не нужно вызывать insertResultInto

View File

@ -100,34 +100,47 @@ public:
StringRef getDataAt(size_t n) const
{
return StringRef(reinterpret_cast<const char *>(&data[n]), sizeof(data[n]));
return StringRef(data[n], sizeof(data[n]));
}
/// Объединить состояние в последней строке с заданным
void insertMerge(const Field & x)
void insertMerge(StringRef input)
{
ReadBufferFromString read_buffer(x.safeGet<const String &>());
func->deserializeMerge(data.back(), read_buffer);
func->merge(data.back(), input.data);
}
void insert(const Field & x)
{
data.push_back(AggregateDataPtr());
func->create(data.back());
insertMerge(x);
ReadBufferFromString read_buffer(x.safeGet<const String &>());
func->deserializeMerge(data.back(), read_buffer);
}
void insertData(const char * pos, size_t length)
{
data.push_back(AggregateDataPtr());
func->create(data.back());
ReadBuffer read_buffer(const_cast<char *>(pos), length);
func->deserializeMerge(data.back(), read_buffer);
data.push_back(*reinterpret_cast<const AggregateDataPtr *>(pos));
// For debugging:
// AggregateDataPtr tmp = AggregateDataPtr();
// func->create(tmp);
// func->merge(tmp, data.back());
}
ColumnPtr cut(size_t start, size_t length) const
{
throw Exception("Method cut is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
if (start + length > this->data.size())
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnAggregateFunction::cut() method"
" (data.size() = " + toString(this->data.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(func, arenas);
ColumnPtr res = res_;
res_->data.resize(length);
for (size_t i = 0; i < length; ++i)
res_->data[i] = this->data[start+i];
return res;
}
ColumnPtr filter(const Filter & filter) const
@ -137,7 +150,22 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const
{
throw Exception("Method permute is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
size_t size = this->data.size();
if (limit == 0)
limit = size;
else
limit = std::min(size, limit);
if (perm.size() < limit)
throw Exception("Size of permutation is less than required.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(func, arenas);
ColumnPtr res = res_;
res_->data.resize(limit);
for (size_t i = 0; i < limit; ++i)
res_->data[i] = this->data[perm[i]];
return res;
}
ColumnPtr replicate(const Offsets_t & offsets) const

View File

@ -79,7 +79,7 @@ private:
for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i)
{
size_t j = column_numbers_to_aggregate[i];
column_to_aggregate[i]->insertMerge((*cursor->all_columns[j])[cursor->pos]);
column_to_aggregate[i]->insertMerge((*cursor->all_columns[j]).getDataAt(cursor->pos));
}
}
};

View File

@ -470,7 +470,6 @@ protected:
Names key_names;
AggregateDescriptions aggregates;
std::vector<IAggregateFunction *> aggregate_functions;
std::vector<char> is_final;
size_t keys_size;
size_t aggregates_size;
/// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
@ -528,7 +527,7 @@ protected:
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
size_t start_row) const;
size_t start_row, bool final) const;
template <typename Method>
void mergeDataImpl(

View File

@ -381,8 +381,9 @@ public:
UInt64 getMaxDataPartIndex();
std::string getTableName() const { throw Exception("Logical error: calling method getTableName of not a table.",
ErrorCodes::LOGICAL_ERROR); }
std::string getTableName() const {
return "abc";//throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & getColumnsList() const { return *columns; }

View File

@ -39,7 +39,6 @@ void Aggregator::initialize(Block & block)
initialized = true;
aggregate_functions.resize(aggregates_size);
is_final.assign(aggregates_size, false);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i] = &*aggregates[i].function;
@ -233,7 +232,10 @@ void Aggregator::convertToBlockImpl(
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
size_t start_row) const
size_t start_row,
bool final) const
{
if (!final)
{
size_t j = start_row;
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j)
@ -241,14 +243,22 @@ void Aggregator::convertToBlockImpl(
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
if (!is_final[i])
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
}
}
else
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}
}
template <typename Method>
@ -529,12 +539,9 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
try
{
for (size_t i = 0; i < aggregates_size; ++i)
is_final[i] = final && aggregate_functions[i]->canBeFinal();
for (size_t i = 0; i < aggregates_size; ++i)
{
if (!is_final[i])
if (!final)
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
@ -552,6 +559,14 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
column.column = column.type->createColumn();
column.column->reserve(rows);
if (!aggregate_functions[i]->canBeFinal())
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*column.column);
for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
}
final_aggregate_columns[i] = column.column;
}
}
@ -561,7 +576,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
if (!is_final[i])
if (!final)
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
@ -575,19 +590,19 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
if (data_variants.type == AggregatedDataVariants::KEY_64)
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
convertToBlockImpl(*data_variants.key_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_FIXED_STRING)
convertToBlockImpl(*data_variants.key_fixed_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEYS_128)
convertToBlockImpl(*data_variants.keys128, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::HASHED)
convertToBlockImpl(*data_variants.hashed, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}

View File

@ -69,6 +69,7 @@ std::string MergeTreeData::getModePrefix() const
case Ordinary: return "";
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);

View File

@ -212,6 +212,8 @@ StoragePtr StorageFactory::get(
mode = MergeTreeData::Collapsing;
else if (name_part == "Summing")
mode = MergeTreeData::Summing;
else if (name_part == "Aggregating")
mode = MergeTreeData::Aggregating;
else if (!name_part.empty())
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);