dbms: improved performance (less copies of fields) [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-01-07 06:47:15 +00:00
parent bf32223096
commit 865130d3b6
16 changed files with 107 additions and 32 deletions

View File

@ -24,6 +24,11 @@ public:
return data[n];
}
void get(size_t n, Field & res) const
{
res = data[n];
}
StringRef getDataAt(size_t n) const
{
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
@ -49,7 +54,7 @@ public:
void insert(const Field & x)
{
data.push_back(get<const AggregateFunctionPtr &>(x));
data.push_back(DB::get<const AggregateFunctionPtr &>(x));
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const

View File

@ -57,6 +57,17 @@ public:
return res;
}
void get(size_t n, Field & res) const
{
size_t offset = offsetAt(n);
size_t size = sizeAt(n);
res = Array(size);
Array & res_arr = DB::get<Array &>(res);
for (size_t i = 0; i < size; ++i)
data->get(offset + i, res_arr[i]);
}
StringRef getDataAt(size_t n) const
{
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
@ -95,7 +106,7 @@ public:
void insert(const Field & x)
{
const Array & array = get<const Array &>(x);
const Array & array = DB::get<const Array &>(x);
size_t size = array.size();
for (size_t i = 0; i < size; ++i)
data->insert(array[i]);

View File

@ -39,6 +39,7 @@ public:
ColumnPtr cloneEmpty() const { return new ColumnConst(0, data); }
size_t size() const { return s; }
Field operator[](size_t n) const { return typename NearestFieldType<T>::Type(data); }
void get(size_t n, Field & res) const { res = typename NearestFieldType<T>::Type(data); }
void cut(size_t start, size_t length) { s = length; }
void clear() { s = 0; }

View File

@ -46,6 +46,14 @@ public:
return res;
}
void get(size_t index, Field & res) const
{
res = Array(n);
Array & res_arr = DB::get<Array &>(res);
for (size_t i = n * index; i < n * (index + 1); ++i)
data->get(n * index + i, res_arr[i]);
}
StringRef getDataAt(size_t n) const
{
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
@ -58,7 +66,7 @@ public:
void insert(const Field & x)
{
const Array & array = get<const Array &>(x);
const Array & array = DB::get<const Array &>(x);
if (n != array.size())
throw Exception("Size of array doesn't match size of FixedArray column",
ErrorCodes::SIZE_OF_ARRAY_DOESNT_MATCH_SIZE_OF_FIXEDARRAY_COLUMN);

View File

@ -38,6 +38,11 @@ public:
return String(reinterpret_cast<const char *>(&char_data[n * index]), n);
}
void get(size_t index, Field & res) const
{
res.assignString(reinterpret_cast<const char *>(&char_data[n * index]), n);
}
StringRef getDataAt(size_t index) const
{
return StringRef(&char_data[n * index], n);
@ -45,7 +50,7 @@ public:
void insert(const Field & x)
{
const String & s = get<const String &>(x);
const String & s = DB::get<const String &>(x);
if (s.size() > n)
throw Exception("Too large string '" + s + "' for FixedString column", ErrorCodes::TOO_LARGE_STRING_SIZE);

View File

@ -20,6 +20,7 @@ public:
ColumnPtr cloneEmpty() const { return new ColumnSet(0, NULL); }
size_t size() const { return s; }
Field operator[](size_t n) const { throw Exception("Cannot get value from ColumnSet", ErrorCodes::NOT_IMPLEMENTED); }
void get(size_t n, Field & res) const { throw Exception("Cannot get value from ColumnSet", ErrorCodes::NOT_IMPLEMENTED); };
void cut(size_t start, size_t length) { s = length; }
void clear() { s = 0; data = NULL; }
void insert(const Field & x) { throw Exception("Cannot insert element into ColumnSet", ErrorCodes::NOT_IMPLEMENTED); }

View File

@ -37,6 +37,11 @@ public:
return Field(&char_data[offsetAt(n)], sizeAt(n) - 1);
}
void get(size_t n, Field & res) const
{
res.assignString(&char_data[offsetAt(n)], sizeAt(n) - 1);
}
StringRef getDataAt(size_t n) const
{
return StringRef(&char_data[offsetAt(n)], sizeAt(n));
@ -44,7 +49,7 @@ public:
void insert(const Field & x)
{
const String & s = get<const String &>(x);
const String & s = DB::get<const String &>(x);
size_t old_size = char_data.size();
size_t size_to_append = s.size() + 1;

View File

@ -50,6 +50,15 @@ public:
return res;
}
void get(size_t n, Field & res) const
{
size_t size = columns.size();
res = Array(size);
Array & res_arr = DB::get<Array &>(res);
for (size_t i = 0; i < size; ++i)
columns[i]->get(n, res_arr[i]);
}
StringRef getDataAt(size_t n) const
{
throw Exception("Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
@ -57,7 +66,7 @@ public:
void insert(const Field & x)
{
const Array & arr = get<const Array &>(x);
const Array & arr = DB::get<const Array &>(x);
size_t size = columns.size();
if (arr.size() != size)

View File

@ -90,6 +90,11 @@ public:
return typename NearestFieldType<T>::Type(data[n]);
}
void get(size_t n, Field & res) const
{
res = typename NearestFieldType<T>::Type(data[n]);
}
StringRef getDataAt(size_t n) const
{
return StringRef(reinterpret_cast<const char *>(&data[n]), sizeof(data[n]));

View File

@ -54,6 +54,10 @@ public:
*/
virtual Field operator[](size_t n) const = 0;
/** То же самое, но позволяет избежать лишнего копирования, если Field, например, кладётся в контейнер.
*/
virtual void get(size_t n, Field & res) const = 0;
/** Получить кусок памяти, в котором хранится значение, если возможно.
* (если не реализуемо - кидает исключение)
* Используется для оптимизации некоторых вычислений (например, агрегации).

View File

@ -45,7 +45,9 @@ public:
UInt64 = 1,
Int64 = 2,
Float64 = 3,
/// не POD типы
String = 16,
AggregateFunction = 17, /// Состояние агрегатной функции
Array = 18,
@ -108,17 +110,25 @@ public:
/// Создать строку inplace.
Field(const char * data, size_t size)
: which(Types::String)
{
String * __attribute__((__may_alias__)) ptr = reinterpret_cast<String*>(storage);
new (ptr) String(data, size);
create(data, size);
}
Field(const unsigned char * data, size_t size)
: which(Types::String)
{
String * __attribute__((__may_alias__)) ptr = reinterpret_cast<String*>(storage);
new (ptr) String(reinterpret_cast<const char *>(data), size);
create(data, size);
}
void assignString(const char * data, size_t size)
{
destroy();
create(data, size);
}
void assignString(const unsigned char * data, size_t size)
{
destroy();
create(data, size);
}
template <typename T>
@ -301,16 +311,25 @@ private:
}
}
void create(const char * data, size_t size)
{
which = Types::String;
String * __attribute__((__may_alias__)) ptr = reinterpret_cast<String*>(storage);
new (ptr) String(data, size);
}
void create(const unsigned char * data, size_t size)
{
create(reinterpret_cast<const char *>(data), size);
}
void destroy()
{
// std::cerr << this << " Destroying " << getTypeName() << std::endl;
switch (which)
{
default:
break;
case Types::String:
destroy<String>();
break;
@ -320,6 +339,8 @@ private:
case Types::Array:
destroy<Array>();
break;
default:
break;
}
}

View File

@ -60,14 +60,14 @@ private:
void setRow(Row & row, SortCursor & cursor)
{
for (size_t i = 0; i < num_columns; ++i)
row[i] = (*cursor->all_columns[i])[cursor->pos];
cursor->all_columns[i]->get(cursor->pos, row[i]);
}
/// Сохранить первичный ключ, на который указывает cursor в row.
void setPrimaryKey(Row & row, SortCursor & cursor)
{
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
row[i] = (*cursor->sort_columns[i])[cursor->pos];
cursor->sort_columns[i]->get(cursor->pos, row[i]);
}
/// Вставить в результат строки для текущего идентификатора "визита".

View File

@ -98,7 +98,7 @@ inline UInt128 __attribute__((__always_inline__)) pack128(
size_t offset = 0;
for (size_t j = 0; j < keys_size; ++j)
{
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
StringRef key_data = key_columns[j]->getDataAt(i);
memcpy(key_hash_union.bytes + offset, key_data.data, key_sizes[j]);
offset += key_sizes[j];
@ -110,7 +110,7 @@ inline UInt128 __attribute__((__always_inline__)) pack128(
for (size_t j = 0; j < keys_size; ++j)
{
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
StringRef key_data = key_columns[j]->getDataAt(i);
hash.update(key_data.data, key_data.size);
}

View File

@ -26,7 +26,7 @@ bool RowInputStreamFromBlockInputStream::read(Row & row)
row.resize(columns);
for (size_t i = 0; i < columns; ++i)
row[i] = (*current_block.getByPosition(i).column)[pos];
current_block.getByPosition(i).column->get(pos, row[i]);
++pos;
return true;

View File

@ -121,7 +121,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
typedef std::vector<Columns> AggregateColumns;
AggregateColumns aggregate_columns(aggregates_size);
typedef std::vector<Row> Rows;
typedef AutoArray<Row> Rows;
Rows aggregate_arguments(aggregates_size);
/** Используется, если есть ограничение на максимальное количество строк при агрегации,
@ -179,7 +179,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
res[j]->add(aggregate_arguments[j]);
}
@ -224,7 +224,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
}
@ -272,7 +272,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
}
@ -315,7 +315,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
}
@ -358,7 +358,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second.second[j]->add(aggregate_arguments[j]);
}
@ -374,7 +374,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
{
/// Строим ключ
for (size_t j = 0; j < keys_size; ++j)
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
AggregatedData::iterator it = res.find(key);
if (it == res.end())
@ -393,7 +393,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregates_size; ++j)
{
for (size_t k = 0, size = aggregate_arguments[j].size(); k < size; ++k)
aggregate_arguments[j][k] = (*aggregate_columns[j][k])[i];
aggregate_columns[j][k]->get(i, aggregate_arguments[j][k]);
it->second[j]->add(aggregate_arguments[j]);
}
@ -838,7 +838,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
{
/// Строим ключ
for (size_t j = 0; j < keys_size; ++j)
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
AggregatedData::iterator it = res.find(key);
if (it == res.end())

View File

@ -169,7 +169,7 @@ void Set::create(BlockInputStreamPtr stream)
{
/// Строим ключ
for (size_t j = 0; j < keys_size; ++j)
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
res.insert(key);
key.resize(keys_size);
@ -369,7 +369,7 @@ void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result,
{
/// Строим ключ
for (size_t j = 0; j < keys_size; ++j)
key[j] = (*key_columns[j])[i];
key_columns[j]->get(i, key[j]);
vec_res[i] = negative ^ (set.end() != set.find(key));
}