ClickHouse/dbms/src/Interpreters/Aggregator.cpp

394 lines
12 KiB
C++
Raw Normal View History

2011-09-26 07:25:22 +00:00
#include <openssl/md5.h>
2011-09-19 03:34:23 +00:00
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h>
2011-09-26 07:25:22 +00:00
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnsNumber.h>
2011-09-19 03:34:23 +00:00
#include <DB/Interpreters/Aggregator.h>
2011-09-19 01:42:16 +00:00
namespace DB
{
2011-09-26 14:06:19 +00:00
class FieldVisitorHash : public boost::static_visitor<>
2011-09-26 07:25:22 +00:00
{
public:
MD5_CTX state;
2011-09-26 14:06:19 +00:00
FieldVisitorHash()
2011-09-26 07:25:22 +00:00
{
MD5_Init(&state);
}
2011-09-26 14:06:19 +00:00
void finalize(unsigned char * place)
{
MD5_Final(place, &state);
}
2011-09-26 11:05:38 +00:00
void operator() (const Null & x)
2011-09-26 07:25:22 +00:00
{
2011-09-26 15:22:25 +00:00
char type = FieldType::Null;
2011-09-26 07:25:22 +00:00
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
}
2011-09-26 11:05:38 +00:00
void operator() (const UInt64 & x)
2011-09-26 07:25:22 +00:00
{
2011-09-26 15:22:25 +00:00
char type = FieldType::UInt64;
2011-09-26 07:25:22 +00:00
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
}
2011-09-26 11:05:38 +00:00
void operator() (const Int64 & x)
2011-09-26 07:25:22 +00:00
{
2011-09-26 15:22:25 +00:00
char type = FieldType::Int64;
2011-09-26 07:25:22 +00:00
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
}
void operator() (const Float64 & x)
{
2011-09-26 15:22:25 +00:00
char type = FieldType::Float64;
2011-09-26 07:25:22 +00:00
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
}
void operator() (const String & x)
{
2011-09-26 15:22:25 +00:00
char type = FieldType::String;
2011-09-26 07:25:22 +00:00
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
2011-09-28 05:24:38 +00:00
/// Используем ноль на конце.
MD5_Update(&state, x.c_str(), x.size() + 1);
2011-09-26 07:25:22 +00:00
}
void operator() (const Array & x)
{
throw Exception("Cannot aggregate by array", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
}
void operator() (const SharedPtr<IAggregateFunction> & x)
{
throw Exception("Cannot aggregate by state of aggregate function", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
}
};
2011-09-26 11:05:38 +00:00
2011-09-26 07:25:22 +00:00
2011-09-26 15:22:25 +00:00
/** Преобразование значения в 64 бита. Для чисел - однозначное, для строк - некриптографический хэш. */
2011-09-26 12:50:50 +00:00
class FieldVisitorToUInt64 : public boost::static_visitor<UInt64>
2011-09-26 07:25:22 +00:00
{
public:
2011-10-03 06:22:37 +00:00
FieldVisitorToUInt64() {}
2011-09-26 07:25:22 +00:00
UInt64 operator() (const Null & x) const { return 0; }
UInt64 operator() (const UInt64 & x) const { return x; }
UInt64 operator() (const Int64 & x) const { return x; }
UInt64 operator() (const Float64 & x) const
{
UInt64 res = 0;
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<const char *>(&x), sizeof(x));
return res;
}
2011-09-26 15:22:25 +00:00
2011-09-26 07:25:22 +00:00
UInt64 operator() (const String & x) const
{
2011-09-26 15:22:25 +00:00
return std::tr1::hash<String>()(x);
2011-09-26 07:25:22 +00:00
}
UInt64 operator() (const Array & x) const
{
throw Exception("Cannot aggregate by array", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
}
UInt64 operator() (const SharedPtr<IAggregateFunction> & x) const
{
throw Exception("Cannot aggregate by state of aggregate function", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
}
};
2011-09-28 05:24:38 +00:00
/** Результат хранится в оперативке и должен полностью помещаться в оперативку.
2011-09-19 01:42:16 +00:00
*/
2011-09-26 07:25:22 +00:00
void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result)
2011-09-19 01:42:16 +00:00
{
2011-09-25 03:37:09 +00:00
size_t keys_size = keys.empty() ? key_names.size() : keys.size();
2011-09-19 01:42:16 +00:00
size_t aggregates_size = aggregates.size();
Row key(keys_size);
Columns key_columns(keys_size);
typedef std::vector<Columns> AggregateColumns;
AggregateColumns aggregate_columns(aggregates_size);
typedef std::vector<Row> Rows;
Rows aggregate_arguments(aggregates_size);
/// Читаем все данные
while (Block block = stream->read())
{
2011-09-24 20:32:41 +00:00
/// Преобразуем имена столбцов в номера, если номера не заданы
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
2011-09-25 05:07:47 +00:00
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_arguments[i].resize(aggregates[i].arguments.size());
aggregate_columns[i].resize(aggregates[i].arguments.size());
}
2011-09-24 20:32:41 +00:00
2011-09-19 01:42:16 +00:00
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0, size = keys_size; i < size; ++i)
key_columns[i] = block.getByPosition(keys[i]).column;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
aggregate_columns[i][j] = block.getByPosition(aggregates[i].arguments[j]).column;
2011-09-19 03:34:23 +00:00
/// Создадим пример блока, описывающего результат
if (!sample)
{
for (size_t i = 0, size = keys_size; i < size; ++i)
sample.insert(block.getByPosition(keys[i]).cloneEmpty());
for (size_t i = 0; i < aggregates_size; ++i)
{
ColumnWithNameAndType col;
2011-09-25 03:37:09 +00:00
col.name = aggregates[i].column_name;
2011-09-19 03:34:23 +00:00
col.type = new DataTypeAggregateFunction;
col.column = new ColumnAggregateFunction;
sample.insert(col);
}
2011-09-26 01:50:32 +00:00
/// Вставим в блок результата все столбцы-константы из исходного блока, так как они могут ещё пригодиться.
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
if (block.getByPosition(i).column->isConst())
sample.insert(block.getByPosition(i).cloneEmpty());
2011-09-19 03:34:23 +00:00
}
2011-09-19 01:42:16 +00:00
size_t rows = block.rows();
2011-09-26 07:25:22 +00:00
/// Каким способом выполнять агрегацию?
bool has_strings = false;
for (size_t j = 0; j < keys_size; ++j)
if (dynamic_cast<const ColumnString *>(&*key_columns[j]) || dynamic_cast<const ColumnFixedString *>(&*key_columns[j]))
has_strings = true;
2011-09-26 12:50:50 +00:00
bool keys_fit_128_bits = true;
size_t keys_bytes = 0;
typedef std::vector<size_t> Sizes;
Sizes key_sizes(keys_size);
for (size_t j = 0; j < keys_size; ++j)
{
if (!key_columns[j]->isNumeric())
{
keys_fit_128_bits = false;
break;
}
key_sizes[j] = key_columns[j]->sizeOfField();
keys_bytes += key_sizes[j];
}
if (keys_bytes > 16)
keys_fit_128_bits = false;
2011-09-26 07:25:22 +00:00
if (keys_size == 0)
2011-09-19 01:42:16 +00:00
{
2011-09-26 07:25:22 +00:00
/// Если ключей нет
AggregatedDataWithoutKey & res = result.without_key;
2011-09-26 11:58:35 +00:00
if (res.empty())
{
res.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
res[i] = aggregates[i].function->cloneEmpty();
}
2011-09-26 07:25:22 +00:00
for (size_t i = 0; i < rows; ++i)
{
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
res[j]->add(aggregate_arguments[j]);
}
}
}
else if (keys_size == 1 && key_columns[0]->isNumeric()
&& !dynamic_cast<ColumnFloat32 *>(&*key_columns[0]) && !dynamic_cast<ColumnFloat64 *>(&*key_columns[0]))
{
/// Если есть один ключ, который помещается в 64 бита, и это не число с плавающей запятой
AggregatedDataWithUInt64Key & res = result.key64;
2011-09-26 12:50:50 +00:00
const FieldVisitorToUInt64 visitor;
2011-09-26 07:25:22 +00:00
IColumn & column = *key_columns[0];
2011-09-19 01:42:16 +00:00
2011-09-26 07:25:22 +00:00
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
2011-09-19 01:42:16 +00:00
{
2011-09-26 07:25:22 +00:00
/// Строим ключ
Field field = column[i];
UInt64 key = boost::apply_visitor(visitor, field);
2011-12-19 02:00:40 +00:00
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
res.emplace(key, it, inserted);
if (inserted)
2011-09-26 07:25:22 +00:00
{
2011-12-19 02:00:40 +00:00
new(&it->second) AggregateFunctions(aggregates_size);
2011-09-26 07:25:22 +00:00
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
}
/// Добавляем значения
2011-09-19 01:42:16 +00:00
for (size_t j = 0; j < aggregates_size; ++j)
2011-09-26 07:25:22 +00:00
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
it->second[j]->add(aggregate_arguments[j]);
}
2011-09-19 01:42:16 +00:00
}
2011-09-26 07:25:22 +00:00
}
2011-09-26 15:22:25 +00:00
else if (keys_size == 1
&& (dynamic_cast<ColumnString *>(&*key_columns[0]) || dynamic_cast<ColumnFixedString *>(&*key_columns[0])))
{
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
AggregatedDataWithStringKey & res = result.key_string;
IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
String key = boost::get<String>(column[i]);
AggregatedDataWithStringKey::iterator it = res.find(key);
if (it == res.end())
{
it = res.insert(std::make_pair(key, AggregateFunctions(aggregates_size))).first;
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
it->second[j]->add(aggregate_arguments[j]);
}
}
}
2011-09-26 12:50:50 +00:00
else
2011-09-26 07:25:22 +00:00
{
2011-09-26 15:22:25 +00:00
/// Если много ключей - будем агрегировать по хэшу от них
2011-09-26 07:25:22 +00:00
AggregatedDataHashed & res = result.hashed;
2011-09-26 12:50:50 +00:00
const FieldVisitorToUInt64 to_uint64_visitor;
2011-09-19 01:42:16 +00:00
2011-09-26 07:25:22 +00:00
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
2011-09-19 01:42:16 +00:00
{
2011-09-26 07:25:22 +00:00
/// Строим ключ
union
{
UInt128 key_hash;
unsigned char bytes[16];
} key_hash_union;
2011-09-26 12:50:50 +00:00
/// Если все ключи числовые и помещаются в 128 бит
if (keys_fit_128_bits)
{
memset(key_hash_union.bytes, 0, 16);
size_t offset = 0;
for (size_t j = 0; j < keys_size; ++j)
{
key[j] = (*key_columns[j])[i];
UInt64 tmp = boost::apply_visitor(to_uint64_visitor, key[j]);
/// Работает только на little endian
2011-09-26 13:16:11 +00:00
memcpy(key_hash_union.bytes + offset, reinterpret_cast<const char *>(&tmp), key_sizes[j]);
2011-09-26 12:50:50 +00:00
offset += key_sizes[j];
}
}
else /// Иначе используем md5.
{
2011-09-26 14:06:19 +00:00
FieldVisitorHash key_hash_visitor;
2011-09-26 12:50:50 +00:00
for (size_t j = 0; j < keys_size; ++j)
{
key[j] = (*key_columns[j])[i];
boost::apply_visitor(key_hash_visitor, key[j]);
}
2011-09-26 14:06:19 +00:00
key_hash_visitor.finalize(key_hash_union.bytes);
2011-09-26 12:50:50 +00:00
}
2011-09-26 07:25:22 +00:00
2011-12-19 02:00:40 +00:00
AggregatedDataHashed::iterator it;
bool inserted;
res.emplace(key_hash_union.key_hash, it, inserted);
if (inserted)
2011-09-26 07:25:22 +00:00
{
2011-12-19 02:00:40 +00:00
new(&it->second) AggregatedDataHashed::mapped_type(key, AggregateFunctions(aggregates_size));
2011-09-19 01:42:16 +00:00
2011-09-26 07:25:22 +00:00
for (size_t j = 0; j < aggregates_size; ++j)
it->second.second[j] = aggregates[j].function->cloneEmpty();
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
it->second.second[j]->add(aggregate_arguments[j]);
}
2011-09-19 01:42:16 +00:00
}
}
2011-09-26 07:25:22 +00:00
/* else
{
/// Общий способ
AggregatedData & res = result.generic;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
for (size_t j = 0; j < keys_size; ++j)
key[j] = (*key_columns[j])[i];
AggregatedData::iterator it = res.find(key);
if (it == res.end())
{
it = res.insert(std::make_pair(key, AggregateFunctions(aggregates_size))).first;
for (size_t j = 0; j < aggregates_size; ++j)
it->second[j] = aggregates[j].function->cloneEmpty();
}
2011-09-19 01:42:16 +00:00
2011-09-26 07:25:22 +00:00
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
it->second[j]->add(aggregate_arguments[j]);
}
}
}*/
}
2011-09-19 01:42:16 +00:00
}
}