diff --git a/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h index 4ce623b774a..a09e8a7bb82 100644 --- a/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h @@ -59,15 +59,15 @@ private: ColumnNumbers column_numbers_to_aggregate; std::vector columns_to_aggregate; - Row current_key; /// Текущий первичный ключ. - Row next_key; /// Первичный ключ следующей строки. + RowRef current_key; /// Текущий первичный ключ. + RowRef next_key; /// Первичный ключ следующей строки. - Row current_row; + RowRef current_row; /** Делаем поддержку двух разных курсоров - с Collation и без. * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ - template + template void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); /// Вставить в результат первую строку для текущей группы. @@ -75,7 +75,7 @@ private: /** Извлечь все состояния аггрегатных функций и объединить с текущей группой. */ - template + template void addRow(TSortCursor & cursor) { for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i) diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index deb396f5d5f..352bf5f49cc 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -190,14 +190,6 @@ protected: } } - /// Сохранить первичный ключ, на который указывает cursor в row. - template - void setPrimaryKey(Row & row, TSortCursor & cursor) - { - for (size_t i = 0; i < cursor->sort_columns_size; ++i) - cursor->sort_columns[i]->get(cursor->pos, row[i]); - } - template void setRowRef(RowRef & row_ref, TSortCursor & cursor) { diff --git a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h index 9f7d84b18e8..ec28a134b93 100644 --- a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h @@ -79,13 +79,13 @@ private: /// Найденные вложенные Map таблицы std::vector maps_to_sum; - Row current_key; /// Текущий первичный ключ. - Row next_key; /// Первичный ключ следующей строки. + RowRef current_key; /// Текущий первичный ключ. + RowRef next_key; /// Первичный ключ следующей строки. Row current_row; - bool current_row_is_zero = false; /// Текущая строчка просуммировалась в ноль, и её следует удалить. + bool current_row_is_zero = true; /// Текущая строчка просуммировалась в ноль, и её следует удалить. - bool output_is_non_empty = false; /// Отдали ли мы наружу хоть одну строку. + bool output_is_non_empty = false; /// Отдали ли мы наружу хоть одну строку. /** Делаем поддержку двух разных курсоров - с Collation и без. * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 2e696f5a81b..66f3cfd3137 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -8,7 +8,7 @@ namespace DB void AggregatingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns) { for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insert(current_row[i]); + merged_columns[i]->insertFrom(*current_row.columns[i], current_row.row_num); } @@ -30,9 +30,7 @@ Block AggregatingSortedBlockInputStream::readImpl() /// Дополнительная инициализация. if (current_row.empty()) { - current_row.resize(num_columns); - current_key.resize(description.size()); - next_key.resize(description.size()); + current_row.columns.resize(num_columns); /// Заполним номера столбцов, которые нужно доагрегировать. for (size_t i = 0; i < num_columns; ++i) @@ -79,7 +77,15 @@ void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, { TSortCursor current = queue.top(); - setPrimaryKey(next_key, current); + if (current_key.empty()) + { + current_key.columns.resize(description.size()); + next_key.columns.resize(description.size()); + + setPrimaryKeyRef(current_key, current); + } + + setPrimaryKeyRef(next_key, current); bool key_differs = next_key != current_key; @@ -91,11 +97,10 @@ void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, if (key_differs) { - current_key = std::move(next_key); - next_key.resize(description.size()); + current_key.swap(next_key); /// Запишем данные для очередной группы. - setRow(current_row, current); + setRowRef(current_row, current); insertCurrentRow(merged_columns); ++merged_rows; } diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index b879be47d1d..46e1d367a1d 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -70,8 +70,6 @@ Block SummingSortedBlockInputStream::readImpl() if (current_row.empty()) { current_row.resize(num_columns); - current_key.resize(description.size()); - next_key.resize(description.size()); std::unordered_map> discovered_maps; /** Заполним номера столбцов, которые должны быть просуммированы. @@ -178,7 +176,15 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: { TSortCursor current = queue.top(); - setPrimaryKey(next_key, current); + if (current_key.empty()) + { + current_key.columns.resize(description.size()); + next_key.columns.resize(description.size()); + + setPrimaryKeyRef(current_key, current); + } + + setPrimaryKeyRef(next_key, current); bool key_differs = next_key != current_key; @@ -191,15 +197,14 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: if (key_differs) { /// Запишем данные для предыдущей группы. - if (!current_key[0].isNull() && !current_row_is_zero) + if (!current_row_is_zero) { ++merged_rows; output_is_non_empty = true; insertCurrentRow(merged_columns); } - current_key = std::move(next_key); - next_key.resize(description.size()); + current_key.swap(next_key); setRow(current_row, current); current_row_is_zero = false;