Improved Summing/Aggregating streams [#METR-12588].

This commit is contained in:
Alexey Milovidov 2016-04-14 00:15:36 +03:00
parent 054df259cd
commit b664f367d8
5 changed files with 33 additions and 31 deletions

View File

@ -59,15 +59,15 @@ private:
ColumnNumbers column_numbers_to_aggregate;
std::vector<ColumnAggregateFunction *> columns_to_aggregate;
Row current_key; /// Текущий первичный ключ.
Row next_key; /// Первичный ключ следующей строки.
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
Row current_row;
RowRef current_row;
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template<class TSortCursor>
template <class TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат первую строку для текущей группы.
@ -75,7 +75,7 @@ private:
/** Извлечь все состояния аггрегатных функций и объединить с текущей группой.
*/
template<class TSortCursor>
template <class TSortCursor>
void addRow(TSortCursor & cursor)
{
for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i)

View File

@ -190,14 +190,6 @@ protected:
}
}
/// Сохранить первичный ключ, на который указывает cursor в row.
template <class TSortCursor>
void setPrimaryKey(Row & row, TSortCursor & cursor)
{
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
cursor->sort_columns[i]->get(cursor->pos, row[i]);
}
template <class TSortCursor>
void setRowRef(RowRef & row_ref, TSortCursor & cursor)
{

View File

@ -79,13 +79,13 @@ private:
/// Найденные вложенные Map таблицы
std::vector<map_description> maps_to_sum;
Row current_key; /// Текущий первичный ключ.
Row next_key; /// Первичный ключ следующей строки.
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
Row current_row;
bool current_row_is_zero = false; /// Текущая строчка просуммировалась в ноль, и её следует удалить.
bool current_row_is_zero = true; /// Текущая строчка просуммировалась в ноль, и её следует удалить.
bool output_is_non_empty = false; /// Отдали ли мы наружу хоть одну строку.
bool output_is_non_empty = false; /// Отдали ли мы наружу хоть одну строку.
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.

View File

@ -8,7 +8,7 @@ namespace DB
void AggregatingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns)
{
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(current_row[i]);
merged_columns[i]->insertFrom(*current_row.columns[i], current_row.row_num);
}
@ -30,9 +30,7 @@ Block AggregatingSortedBlockInputStream::readImpl()
/// Дополнительная инициализация.
if (current_row.empty())
{
current_row.resize(num_columns);
current_key.resize(description.size());
next_key.resize(description.size());
current_row.columns.resize(num_columns);
/// Заполним номера столбцов, которые нужно доагрегировать.
for (size_t i = 0; i < num_columns; ++i)
@ -79,7 +77,15 @@ void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns,
{
TSortCursor current = queue.top();
setPrimaryKey(next_key, current);
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
setPrimaryKeyRef(next_key, current);
bool key_differs = next_key != current_key;
@ -91,11 +97,10 @@ void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns,
if (key_differs)
{
current_key = std::move(next_key);
next_key.resize(description.size());
current_key.swap(next_key);
/// Запишем данные для очередной группы.
setRow(current_row, current);
setRowRef(current_row, current);
insertCurrentRow(merged_columns);
++merged_rows;
}

View File

@ -70,8 +70,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (current_row.empty())
{
current_row.resize(num_columns);
current_key.resize(description.size());
next_key.resize(description.size());
std::unordered_map<std::string, std::vector<std::size_t>> discovered_maps;
/** Заполним номера столбцов, которые должны быть просуммированы.
@ -178,7 +176,15 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
{
TSortCursor current = queue.top();
setPrimaryKey(next_key, current);
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
setPrimaryKeyRef(next_key, current);
bool key_differs = next_key != current_key;
@ -191,15 +197,14 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
if (key_differs)
{
/// Запишем данные для предыдущей группы.
if (!current_key[0].isNull() && !current_row_is_zero)
if (!current_row_is_zero)
{
++merged_rows;
output_is_non_empty = true;
insertCurrentRow(merged_columns);
}
current_key = std::move(next_key);
next_key.resize(description.size());
current_key.swap(next_key);
setRow(current_row, current);
current_row_is_zero = false;