dbms: Aggregator: improvement [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-02-16 20:15:45 +00:00
parent 9e4fd68e6c
commit 7b790fb2a8
12 changed files with 62 additions and 14 deletions

View File

@ -76,6 +76,11 @@ public:
{
throw Exception("Method getDataAt is not supported for ColumnAggregateFunction. You must access underlying vector directly.", ErrorCodes::NOT_IMPLEMENTED);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void cut(size_t start, size_t length)
{

View File

@ -73,6 +73,11 @@ public:
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void cut(size_t start, size_t length)
{
if (length == 0)

View File

@ -47,6 +47,11 @@ public:
{
throw Exception("Cannot insert element into constant column " + getName(), ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Cannot insert element into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertDefault() { ++s; }

View File

@ -59,6 +59,11 @@ public:
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void cut(size_t start, size_t length)
{
data->cut(n * start, n * length);

View File

@ -72,6 +72,13 @@ public:
memcpy(&char_data[old_size], &src.char_data[n * index], n);
}
void insertData(const char * pos, size_t length)
{
size_t old_size = char_data.size();
char_data.resize(old_size + n);
memcpy(&char_data[old_size], pos, n);
}
void insertDefault()
{
char_data.resize(char_data.size() + n);

View File

@ -28,6 +28,7 @@ public:
size_t byteSize() const { return 0; }
int compareAt(size_t n, size_t m, const IColumn & rhs_) const { return 0; }
StringRef getDataAt(size_t n) const { throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); }
void insertData(const char * pos, size_t length) { throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); }
void filter(const Filter & filt)
{

View File

@ -73,6 +73,15 @@ public:
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + size_to_append);
}
void insertData(const char * pos, size_t length)
{
size_t old_size = char_data.size();
char_data.resize(old_size + length);
memcpy(&char_data[old_size], pos, length);
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + length);
}
void filter(const Filter & filt)
{
size_t size = getOffsets().size();

View File

@ -64,6 +64,11 @@ public:
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertData(const char * pos, size_t length)
{
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insert(const Field & x)
{
const Array & arr = DB::get<const Array &>(x);

View File

@ -62,6 +62,11 @@ public:
data.push_back(static_cast<const ColumnVectorBase<T> &>(src).getData()[n]);
}
void insertData(const char * pos, size_t length)
{
data.push_back(*reinterpret_cast<const T *>(pos));
}
void insertDefault()
{
data.push_back(T());

View File

@ -80,6 +80,14 @@ public:
*/
virtual void insertFrom(const IColumn & src, size_t n) { insert(src[n]); }
/** Вставить данные, расположенные в указанном куске памяти, если возможно.
* (если не реализуемо - кидает исключение)
* Используется для оптимизации некоторых вычислений (например, агрегации).
* Строки переменной длины передаются с нулём на конце.
* В случае данных постоянной длины, параметр length может игнорироваться.
*/
virtual void insertData(const char * pos, size_t length) = 0;
/** Вставить значение "по умолчанию".
* Используется, когда нужно увеличить размер столбца, но значение не имеет смысла.
* Например, для ColumnNullable, если взведён флаг null, то соответствующее значение во вложенном столбце игнорируется.

View File

@ -79,7 +79,7 @@ struct AggregatedDataVariants
/// Специализация для случая, когда ключи отсутствуют.
AggregatedDataWithoutKey without_key;
/// Специализация для случая, когда есть один числовой ключ (не с плавающей запятой).
/// Специализация для случая, когда есть один числовой ключ.
AggregatedDataWithUInt64Key key64;
/// Специализация для случая, когда есть один строковый ключ.

View File

@ -130,10 +130,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
if (keys_size == 0)
return AggregatedDataVariants::WITHOUT_KEY;
/// Если есть один ключ, который помещается в 64 бита, и это не число с плавающей запятой
if (keys_size == 1 && key_columns[0]->isNumeric()
&& !dynamic_cast<const ColumnFloat32 *>(key_columns[0]) && !dynamic_cast<const ColumnFloat64 *>(key_columns[0])
&& !dynamic_cast<const ColumnConstFloat32 *>(key_columns[0]) && !dynamic_cast<const ColumnConstFloat64 *>(key_columns[0]))
/// Если есть один ключ, который помещается в 64 бита
if (keys_size == 1 && key_columns[0]->isNumeric())
return AggregatedDataVariants::KEY_64;
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
@ -298,7 +296,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1]));
AggregatedDataWithStringKey::iterator it;
bool inserted;
@ -538,16 +536,11 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
AggregatedDataWithUInt64Key & data = data_variants.key64;
IColumn & first_column = *key_columns[0];
bool is_signed = dynamic_cast<ColumnInt8 *>(&first_column) || dynamic_cast<ColumnInt16 *>(&first_column)
|| dynamic_cast<ColumnInt32 *>(&first_column) || dynamic_cast<ColumnInt64 *>(&first_column);
size_t j = 0;
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
if (is_signed)
first_column.insert(static_cast<Int64>(it->first));
else
first_column.insert(it->first);
first_column.insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i];
@ -561,7 +554,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
size_t j = 0;
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
first_column.insert(String(it->first.data, it->first.size)); /// Здесь можно ускорить, сделав метод insertFrom(const char *, size_t size).
first_column.insertData(it->first.data, it->first.size);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i];
@ -869,7 +862,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1]));
AggregatedDataWithStringKey::iterator it;
bool inserted;