mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
dbms: development [#CONV-2944].
This commit is contained in:
parent
de9845ee22
commit
6c1a9ede84
@ -43,6 +43,7 @@ struct UInt128Hash
|
|||||||
typedef std::map<Row, AggregateFunctions> AggregatedData;
|
typedef std::map<Row, AggregateFunctions> AggregatedData;
|
||||||
typedef AggregateFunctions AggregatedDataWithoutKey;
|
typedef AggregateFunctions AggregatedDataWithoutKey;
|
||||||
typedef std::tr1::unordered_map<UInt64, AggregateFunctions> AggregatedDataWithUInt64Key;
|
typedef std::tr1::unordered_map<UInt64, AggregateFunctions> AggregatedDataWithUInt64Key;
|
||||||
|
typedef std::tr1::unordered_map<String, AggregateFunctions> AggregatedDataWithStringKey;
|
||||||
typedef std::tr1::unordered_map<UInt128, std::pair<Row, AggregateFunctions>, UInt128Hash> AggregatedDataHashed;
|
typedef std::tr1::unordered_map<UInt128, std::pair<Row, AggregateFunctions>, UInt128Hash> AggregatedDataHashed;
|
||||||
|
|
||||||
|
|
||||||
@ -51,6 +52,7 @@ struct AggregatedDataVariants
|
|||||||
AggregatedData generic;
|
AggregatedData generic;
|
||||||
AggregatedDataWithoutKey without_key;
|
AggregatedDataWithoutKey without_key;
|
||||||
AggregatedDataWithUInt64Key key64;
|
AggregatedDataWithUInt64Key key64;
|
||||||
|
AggregatedDataWithStringKey key_string;
|
||||||
AggregatedDataHashed hashed;
|
AggregatedDataHashed hashed;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -62,6 +62,21 @@ Block AggregatingBlockInputStream::readImpl()
|
|||||||
res.getByPosition(i).column->insert(*jt);
|
res.getByPosition(i).column->insert(*jt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (!data_variants.key_string.empty())
|
||||||
|
{
|
||||||
|
AggregatedDataWithStringKey & data = data_variants.key_string;
|
||||||
|
rows = data.size();
|
||||||
|
IColumn & first_column = *res.getByPosition(0).column;
|
||||||
|
|
||||||
|
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it)
|
||||||
|
{
|
||||||
|
first_column.insert(it->first);
|
||||||
|
|
||||||
|
size_t i = 1;
|
||||||
|
for (AggregateFunctions::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
|
||||||
|
res.getByPosition(i).column->insert(*jt);
|
||||||
|
}
|
||||||
|
}
|
||||||
else if (!data_variants.hashed.empty())
|
else if (!data_variants.hashed.empty())
|
||||||
{
|
{
|
||||||
AggregatedDataHashed & data = data_variants.hashed;
|
AggregatedDataHashed & data = data_variants.hashed;
|
||||||
|
@ -29,34 +29,34 @@ public:
|
|||||||
|
|
||||||
void operator() (const Null & x)
|
void operator() (const Null & x)
|
||||||
{
|
{
|
||||||
int type = FieldType::Null;
|
char type = FieldType::Null;
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator() (const UInt64 & x)
|
void operator() (const UInt64 & x)
|
||||||
{
|
{
|
||||||
int type = FieldType::UInt64;
|
char type = FieldType::UInt64;
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator() (const Int64 & x)
|
void operator() (const Int64 & x)
|
||||||
{
|
{
|
||||||
int type = FieldType::Int64;
|
char type = FieldType::Int64;
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator() (const Float64 & x)
|
void operator() (const Float64 & x)
|
||||||
{
|
{
|
||||||
int type = FieldType::Float64;
|
char type = FieldType::Float64;
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
MD5_Update(&state, reinterpret_cast<const char *>(&x), sizeof(x));
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator() (const String & x)
|
void operator() (const String & x)
|
||||||
{
|
{
|
||||||
int type = FieldType::String;
|
char type = FieldType::String;
|
||||||
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
MD5_Update(&state, reinterpret_cast<const char *>(&type), sizeof(type));
|
||||||
MD5_Update(&state, x.data(), x.size());
|
MD5_Update(&state, x.data(), x.size());
|
||||||
}
|
}
|
||||||
@ -73,7 +73,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/** Преобразование значения в 64 бита; если значение - строка, то используется относительно стойкая хэш-функция. */
|
/** Преобразование значения в 64 бита. Для чисел - однозначное, для строк - некриптографический хэш. */
|
||||||
class FieldVisitorToUInt64 : public boost::static_visitor<UInt64>
|
class FieldVisitorToUInt64 : public boost::static_visitor<UInt64>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -87,10 +87,10 @@ public:
|
|||||||
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<const char *>(&x), sizeof(x));
|
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<const char *>(&x), sizeof(x));
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
UInt64 operator() (const String & x) const
|
UInt64 operator() (const String & x) const
|
||||||
{
|
{
|
||||||
throw Exception("Cannot aggregate by string", ErrorCodes::ILLEGAL_KEY_OF_AGGREGATION);
|
return std::tr1::hash<String>()(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
UInt64 operator() (const Array & x) const
|
UInt64 operator() (const Array & x) const
|
||||||
@ -254,9 +254,41 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (keys_size == 1
|
||||||
|
&& (dynamic_cast<ColumnString *>(&*key_columns[0]) || dynamic_cast<ColumnFixedString *>(&*key_columns[0])))
|
||||||
|
{
|
||||||
|
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
|
||||||
|
AggregatedDataWithStringKey & res = result.key_string;
|
||||||
|
IColumn & column = *key_columns[0];
|
||||||
|
|
||||||
|
/// Для всех строчек
|
||||||
|
for (size_t i = 0; i < rows; ++i)
|
||||||
|
{
|
||||||
|
/// Строим ключ
|
||||||
|
String key = boost::get<String>(column[i]);
|
||||||
|
|
||||||
|
AggregatedDataWithStringKey::iterator it = res.find(key);
|
||||||
|
if (it == res.end())
|
||||||
|
{
|
||||||
|
it = res.insert(std::make_pair(key, AggregateFunctions(aggregates_size))).first;
|
||||||
|
|
||||||
|
for (size_t j = 0; j < aggregates_size; ++j)
|
||||||
|
it->second[j] = aggregates[j].function->cloneEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Добавляем значения
|
||||||
|
for (size_t j = 0; j < aggregates_size; ++j)
|
||||||
|
{
|
||||||
|
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
|
||||||
|
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
|
||||||
|
|
||||||
|
it->second[j]->add(aggregate_arguments[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// Если есть строки - будем агрегировать по хэшу от них
|
/// Если много ключей - будем агрегировать по хэшу от них
|
||||||
AggregatedDataHashed & res = result.hashed;
|
AggregatedDataHashed & res = result.hashed;
|
||||||
const FieldVisitorToUInt64 to_uint64_visitor;
|
const FieldVisitorToUInt64 to_uint64_visitor;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user