dbms: continuing improvement (incomplete) [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-02-08 20:34:30 +00:00
parent d4c64603a2
commit e9a9124069
7 changed files with 87 additions and 88 deletions

View File

@ -11,25 +11,35 @@ namespace DB
{
/** Столбец, хранящий состояния агрегатных функций.
* Состояния агрегатных функций хранятся в пуле (arena), а в массиве (ColumnVector) хранятся указатели на них.
* Состояния агрегатных функций хранятся в пуле (arena),
* (возможно, в нескольких)
* а в массиве (ColumnVector) хранятся указатели на них.
* Столбец захватывает владение пулом и всеми агрегатными функциями,
* которые в него переданы (уничтожает их в дестркуторе).
*/
class ColumnAggregateFunction : public ColumnVector<AggregateDataPtr>
{
private:
typedef SharedPtr<Arena> ArenaPtr;
typedef std::vector<ArenaPtr> Arenas;
const AggregateFunctionPtr func;
SharedPtr<Arena> arena;
Arenas arenas;
public:
ColumnAggregateFunction(AggregateFunctionPtr & func_, SharedPtr<Arena> & arena_)
ColumnAggregateFunction(AggregateFunctionPtr & func_)
{
set(func_, arena_);
set(func_);
}
void set(AggregateFunctionPtr & func_, SharedPtr<Arena> & arena_)
void set(AggregateFunctionPtr & func_)
{
func = func_;
arena = arena_;
}
/// Захватить владение ареной.
void addArena(ArenaPtr & arena_)
{
arenas.push_back(arena_);
}
~ColumnAggregateFunction()

View File

@ -44,11 +44,6 @@ public:
return new DataTypeString;
}
DataTypePtr operator() (const AggregateFunctionPlainPtr & x) const
{
throw Exception("Cannot get DataType for AggregateFunction Field", ErrorCodes::NOT_IMPLEMENTED);
}
DataTypePtr operator() (const Array & x) const
{
return new DataTypeArray(apply_visitor(FieldToDataType(), x.at(0)));

View File

@ -44,35 +44,6 @@ struct StringHash
};
/** Преобразование значения в 64 бита. Для чисел - однозначное, для строк - некриптографический хэш. */
class FieldVisitorToUInt64 : public StaticVisitor<UInt64>
{
public:
FieldVisitorToUInt64() {}
UInt64 operator() (const Null & x) const { return 0; }
UInt64 operator() (const UInt64 & x) const { return x; }
UInt64 operator() (const Int64 & x) const { return x; }
UInt64 operator() (const Float64 & x) const
{
UInt64 res = 0;
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<const char *>(&x), sizeof(x));
return res;
}
UInt64 operator() (const String & x) const
{
return CityHash64(x.data(), x.size());
}
UInt64 operator() (const Array & x) const
{
throw Exception("Cannot aggregate by array", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
}
};
typedef std::vector<size_t> Sizes;

View File

@ -49,6 +49,9 @@ typedef HashMap<UInt128, std::pair<Row, AggregateDataPtr>, UInt128Hash, UInt128Z
struct AggregatedDataVariants
{
/// Пул для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction.
SharedPtr<Arena> aggregates_pool;
/// Наиболее общий вариант. Самый медленный. На данный момент, не используется.
AggregatedData generic;
@ -80,7 +83,7 @@ struct AggregatedDataVariants
};
Type type;
AggregatedDataVariants() : type(EMPTY) {}
AggregatedDataVariants() : aggregates_pool(new Arena), without_key(NULL), type(EMPTY) {}
bool empty() const { return type == EMPTY; }
size_t size() const
@ -111,7 +114,8 @@ class Aggregator
public:
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_,
size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
: keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), initialized(false),
: keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
total_size_of_aggregate_states(0), initialized(false),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
log(&Logger::get("Aggregator"))
{
@ -119,7 +123,8 @@ public:
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_,
size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
: key_names(key_names_), aggregates(aggregates_), keys_size(key_names.size()), aggregates_size(aggregates.size()), initialized(false),
: key_names(key_names_), aggregates(aggregates_), keys_size(key_names.size()), aggregates_size(aggregates.size()),
total_size_of_aggregate_states(0), initialized(false),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
log(&Logger::get("Aggregator"))
{
@ -146,9 +151,14 @@ private:
ColumnNumbers keys;
Names key_names;
AggregateDescriptions aggregates;
std::vector<IAggregateFunction *> aggregate_functions;
size_t keys_size;
size_t aggregates_size;
Sizes sizes_of_aggregate_states; /// Размеры состояний агрегатных функций - для выделения памяти для них в пуле.
Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций.
size_t total_size_of_aggregate_states; /// Суммарный размер строки из агрегатных функций.
/// Для инициализации от первого блока при конкуррентном использовании.
bool initialized;
Poco::FastMutex mutex;

View File

@ -26,9 +26,6 @@ int main(int argc, char ** argv)
field = DB::Null();
std::cerr << DB::apply_visitor(to_string, field) << std::endl;
field = DB::Field::AggregateFunctionPlainPtr(NULL);
std::cerr << DB::apply_visitor(to_string, field) << std::endl;
DB::Field field2;
field2 = field;
std::cerr << DB::apply_visitor(to_string, field2) << std::endl;
@ -37,7 +34,6 @@ int main(int argc, char ** argv)
array.push_back(DB::UInt64(123));
array.push_back(DB::Int64(-123));
array.push_back(DB::String("Hello"));
array.push_back(DB::Field::AggregateFunctionPlainPtr(NULL));
field = array;
std::cerr << DB::apply_visitor(to_string, field) << std::endl;

View File

@ -37,17 +37,21 @@ void Aggregator::initialize(Block & block)
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
aggregate_functions.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i] = *aggregates[i].function;
/// Создадим пример блока, описывающего результат
if (!sample)
{
for (size_t i = 0, size = keys_size; i < size; ++i)
for (size_t i = 0; i < keys_size; ++i)
{
sample.insert(block.getByPosition(keys[i]).cloneEmpty());
if (sample.getByPosition(i).column->isConst())
sample.getByPosition(i).column = dynamic_cast<IColumnConst &>(*sample.getByPosition(i).column).convertToFullColumn();
}
for (size_t i = 0, size = aggregates_size; i < size; ++i)
for (size_t i = 0; i < aggregates_size; ++i)
{
ColumnWithNameAndType col;
col.name = aggregates[i].column_name;
@ -58,7 +62,7 @@ void Aggregator::initialize(Block & block)
argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type;
col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types);
col.column = new ColumnAggregateFunction;
col.column = new ColumnAggregateFunction(aggregates[i].function);
sample.insert(col);
}
@ -68,6 +72,18 @@ void Aggregator::initialize(Block & block)
for (size_t i = 0; i < columns; ++i)
if (block.getByPosition(i).column->isConst())
sample.insert(block.getByPosition(i).cloneEmpty());
/// Инициализируем размеры состояний и смещения для агрегатных функций.
sizes_of_aggregate_states.resize(aggregates_size);
offsets_of_aggregate_states.resize(aggregates_size);
total_size_of_aggregate_states = 0;
for (size_t i = 0; i < aggregates_size; ++i)
{
sizes_of_aggregate_states[i] = aggregates[i].function->sizeOfData();
offsets_of_aggregate_states[i] = current_offset;
total_size_of_aggregate_states += sizes_of_aggregate_states[i];
}
}
}
@ -168,17 +184,18 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (result.type == AggregatedDataVariants::WITHOUT_KEY)
{
AggregatedDataWithoutKey & res = result.without_key;
if (res.empty())
if (!res)
{
res.resize(aggregates_size);
res = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t i = 0; i < aggregates_size; ++i)
res[i] = aggregates[i].function->cloneEmpty();
aggregates[i].function->create(res + offsets_of_aggregate_states[i]);
}
/// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = dynamic_cast<AggregateFunctionCount *>(res[0]);
AggregateFunctionCount * agg_count = dynamic_cast<AggregateFunctionCount *>(aggregate_functions[0]);
if (aggregates_size == 1 && agg_count)
agg_count->addDelta(rows);
agg_count->addDelta(res, rows);
else
{
for (size_t i = 0; i < rows; ++i)
@ -189,7 +206,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
res[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(res + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -197,15 +214,13 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
else if (result.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = result.key64;
const FieldVisitorToUInt64 visitor;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
Field field = column[i];
UInt64 key = apply_visitor(visitor, field);
UInt64 key = get<UInt64>(column[i]);
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
@ -222,10 +237,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (inserted)
{
new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
@ -234,7 +249,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -270,10 +285,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
@ -282,7 +297,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -313,10 +328,10 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
new(&it->second) AggregateFunctionsPlainPtrs(aggregates_size);
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
@ -325,7 +340,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -355,11 +370,12 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (inserted)
{
new(&it->second) AggregatedDataHashed::mapped_type(key, AggregateFunctionsPlainPtrs(aggregates_size));
new(&it->second) AggregatedDataHashed::mapped_type(key, NULL);
it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
key.resize(keys_size);
for (size_t j = 0; j < aggregates_size; ++j)
it->second.second[j] = aggregates[j].function->cloneEmpty();
aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
@ -368,7 +384,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second.second[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(it->second.second + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -390,11 +406,12 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (no_more_keys)
continue;
it = res.insert(std::make_pair(key, AggregateFunctionsPlainPtrs(aggregates_size))).first;
it = res.insert(std::make_pair(key, NULL)).first;
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
key.resize(keys_size);
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]);
}
/// Добавляем значения
@ -403,7 +420,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], aggregate_arguments[j]);
}
}
}
@ -461,7 +478,13 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_columns[i] = &static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column).getData();
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
column_aggregate_func.addArena(data_variants.aggregates_pool);
//
aggregate_columns[i] = &column_aggregate_func.getData();
aggregate_columns[i]->resize(rows);
}
@ -470,7 +493,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[0] = data[i];
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
}
else if (data_variants.type == AggregatedDataVariants::KEY_64)
{
@ -755,15 +778,13 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
else if (result.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = result.key64;
const FieldVisitorToUInt64 visitor;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
Field field = column[i];
UInt64 key = apply_visitor(visitor, field);
UInt64 key = get<UInt64>(column[i]);
AggregatedDataWithUInt64Key::iterator it;
bool inserted;

View File

@ -87,15 +87,13 @@ void Set::create(BlockInputStreamPtr stream)
if (type == KEY_64)
{
SetUInt64 & res = key64;
const FieldVisitorToUInt64 visitor;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
Field field = column[i];
UInt64 key = apply_visitor(visitor, field);
UInt64 key = get<UInt64>(column[i]);
res.insert(key);
}
@ -297,15 +295,13 @@ void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result,
if (type == KEY_64)
{
const SetUInt64 & set = key64;
const FieldVisitorToUInt64 visitor;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
Field field = column[i];
UInt64 key = apply_visitor(visitor, field);
UInt64 key = get<UInt64>(column[i]);
vec_res[i] = negative ^ (set.end() != set.find(key));
}
}