From 2ecbf0b0bb2431593f05853ca6aa10ce74518f82 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 1 Apr 2020 14:45:02 +0300 Subject: [PATCH] Add SummingSortedTransform [part 3] --- .../Merges/SummingSortedTransform.cpp | 184 ++++++++++++++++++ .../Merges/SummingSortedTransform.h | 35 +++- 2 files changed, 217 insertions(+), 2 deletions(-) diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.cpp b/dbms/src/Processors/Merges/SummingSortedTransform.cpp index 45f3f9b71c9..7ae127c2314 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/SummingSortedTransform.cpp @@ -286,6 +286,43 @@ namespace return columns; } + + void finalizeChunk( + Chunk & chunk, size_t num_result_columns, + const SummingSortedTransform::ColumnsDefinition & columns_definition) + { + size_t num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + Columns res_columns(num_result_columns); + size_t next_column = 0; + + for (auto & desc : columns_definition.columns_to_aggregate) + { + auto column = std::move(columns[next_column]); + ++next_column; + + if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType())) + { + /// Unpack tuple into block. + size_t tuple_size = desc.column_numbers.size(); + for (size_t i = 0; i < tuple_size; ++i) + res_columns[desc.column_numbers[i]] = assert_cast(*column).getColumnPtr(i); + } + else + res_columns[desc.column_numbers[0]] = std::move(column); + } + + for (auto column_number : columns_definition.column_numbers_not_to_aggregate) + { + auto column = std::move(columns[next_column]); + ++next_column; + + res_columns[column_number] = std::move(column); + } + + chunk.setColumns(std::move(res_columns), num_rows); + } } SummingSortedTransform::SummingSortedTransform( @@ -302,4 +339,151 @@ SummingSortedTransform::SummingSortedTransform( current_row.resize(num_columns); } +void SummingSortedTransform::initializeInputs() +{ + queue = SortingHeap(cursors); + is_queue_initialized = true; +} + +void SummingSortedTransform::consume(Chunk chunk, size_t input_number) +{ + updateCursor(std::move(chunk), input_number); + + if (is_queue_initialized) + queue.push(cursors[input_number]); +} + +void SummingSortedTransform::updateCursor(Chunk chunk, size_t source_num) +{ + auto num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + for (auto & column : columns) + column = column->convertToFullColumnIfConst(); + + chunk.setColumns(std::move(columns), num_rows); + + auto & source_chunk = source_chunks[source_num]; + + if (source_chunk) + { + /// Extend lifetime of last chunk. + last_chunk = std::move(source_chunk); + last_chunk_sort_columns = std::move(cursors[source_num].all_columns); + + source_chunk = std::move(chunk); + cursors[source_num].reset(source_chunk.getColumns(), {}); + } + else + { + if (cursors[source_num].has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); + + source_chunk = std::move(chunk); + cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num); + } +} + +void SummingSortedTransform::work() +{ + merge(); + prepareOutputChunk(merged_data); + + if (has_output_chunk) + finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition); +} + +void SummingSortedTransform::merge() +{ + /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` + while (queue.isValid()) + { + bool key_differs; + bool has_previous_group = !last_key.empty(); + + SortCursor current = queue.current(); + + { + RowRef current_key; + current_key.set(current); + + if (!has_previous_group) /// The first key encountered. + { + key_differs = true; + current_row_is_zero = true; + } + else + key_differs = !last_key.hasEqualSortColumnsWith(current_key); + + last_key = current_key; + last_chunk_sort_columns.clear(); + } + + if (key_differs) + { + if (has_previous_group) + /// Write the data for the previous group. + insertCurrentRowIfNeeded(); + + if (merged_data.hasEnoughRows()) + { + /// The block is now full and the last row is calculated completely. + last_key.reset(); + return; + } + + setRow(current_row, current); + + /// Reset aggregation states for next row + for (auto & desc : columns_definition.columns_to_aggregate) + desc.createState(); + + // Start aggregations with current row + addRow(current); + + if (columns_definition.maps_to_sum.empty()) + { + /// We have only columns_to_aggregate. The status of current row will be determined + /// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions. + current_row_is_zero = true; // NOLINT + } + else + { + /// We have complex maps that will be summed with 'mergeMap' method. + /// The single row is considered non zero, and the status after merging with other rows + /// will be determined in the branch below (when key_differs == false). + current_row_is_zero = false; // NOLINT + } + } + else + { + addRow(current); + + // Merge maps only for same rows + for (const auto & desc : columns_definition.maps_to_sum) + if (mergeMap(desc, current_row, current)) + current_row_is_zero = false; + } + + if (!current->isLast()) + { + queue.next(); + } + else + { + /// We get the next block from the corresponding source, if there is one. + queue.removeTop(); + requestDataForInput(current.impl->order); + return; + } + } + + /// We will write the data for the last group, if it is non-zero. + /// If it is zero, and without it the output stream will be empty, we will write it anyway. + insertCurrentRowIfNeeded(); + last_chunk_sort_columns.clear(); + is_finished = true; +} + + + } diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.h b/dbms/src/Processors/Merges/SummingSortedTransform.h index 20b49aa7ac8..662cc65d95d 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.h +++ b/dbms/src/Processors/Merges/SummingSortedTransform.h @@ -103,6 +103,13 @@ public: size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } }; + String getName() const override { return "SummingSortedTransform"; } + void work() override; + +protected: + void initializeInputs() override; + void consume(Chunk chunk, size_t input_number) override; + private: Row current_row; bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. @@ -125,10 +132,34 @@ private: struct RowRef { ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns. - UInt64 row_number = 0; + UInt64 row_num = 0; + + bool empty() const { return sort_columns == nullptr; } + void reset() { sort_columns = nullptr; } + + void set(SortCursor & cursor) + { + sort_columns = &cursor.impl->sort_columns; + row_num = cursor.impl->pos; + } + + bool hasEqualSortColumnsWith(const RowRef & other) + { + auto size = sort_columns->size(); + for (size_t col_number = 0; col_number < size; ++col_number) + { + auto & cur_column = (*sort_columns)[col_number]; + auto & other_column = (*other.sort_columns)[col_number]; + + if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1)) + return false; + } + + return true; + } }; - RowRef last_row; + RowRef last_key; SortingHeap queue; bool is_queue_initialized = false;