diff --git a/dbms/src/Processors/Merges/AggregatingSortedTransform.cpp b/dbms/src/Processors/Merges/AggregatingSortedTransform.cpp new file mode 100644 index 00000000000..23524dfc395 --- /dev/null +++ b/dbms/src/Processors/Merges/AggregatingSortedTransform.cpp @@ -0,0 +1,252 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + AggregatingSortedTransform::ColumnsDefinition defineColumns( + const Block & header, const SortDescription & description) + { + AggregatingSortedTransform::ColumnsDefinition def = {}; + size_t num_columns = header.columns(); + + /// Fill in the column numbers that need to be aggregated. + for (size_t i = 0; i < num_columns; ++i) + { + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// We leave only states of aggregate functions. + if (!dynamic_cast(column.type.get()) + && !dynamic_cast(column.type->getCustomName())) + { + def.column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Included into PK? + auto it = description.begin(); + for (; it != description.end(); ++it) + if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) + break; + + if (it != description.end()) + { + def.column_numbers_not_to_aggregate.push_back(i); + continue; + } + + if (auto simple_aggr = dynamic_cast(column.type->getCustomName())) + { + auto type = recursiveRemoveLowCardinality(column.type); + if (type.get() == column.type.get()) + type = nullptr; + + // simple aggregate function + AggregatingSortedTransform::SimpleAggregateDescription desc(simple_aggr->getFunction(), i, type); + if (desc.function->allocatesMemoryInArena()) + def.allocates_memory_in_arena = true; + + def.columns_to_simple_aggregate.emplace_back(std::move(desc)); + } + else + { + // standard aggregate function + def.columns_to_aggregate.emplace_back(i); + } + } + } +} + +AggregatingSortedTransform::AggregatingSortedTransform( + size_t num_inputs, const Block & header, + SortDescription description_, size_t max_block_size) + : IMergingTransform(num_inputs, header, header, true) + , columns_definition(defineColumns(header, description_)) + , merged_data(header.cloneEmptyColumns(), false, max_block_size) + , description(std::move(description_)) + , source_chunks(num_inputs) + , cursors(num_inputs) +{ + merged_data.initAggregateDescription(columns_definition); +} + +void AggregatingSortedTransform::initializeInputs() +{ + queue = SortingHeap(cursors); + is_queue_initialized = true; +} + +void AggregatingSortedTransform::consume(Chunk chunk, size_t input_number) +{ + updateCursor(std::move(chunk), input_number); + + if (is_queue_initialized) + queue.push(cursors[input_number]); +} + +void AggregatingSortedTransform::updateCursor(Chunk chunk, size_t source_num) +{ + auto num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + for (auto & column : columns) + column = column->convertToFullColumnIfConst(); + + for (auto & desc : columns_definition.columns_to_simple_aggregate) + if (desc.type_to_convert) + columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]); + + chunk.setColumns(std::move(columns), num_rows); + + auto & source_chunk = source_chunks[source_num]; + + if (source_chunk) + { + /// Extend lifetime of last chunk. + last_chunk = std::move(source_chunk); + last_chunk_sort_columns = std::move(cursors[source_num].all_columns); + + source_chunk = std::move(chunk); + cursors[source_num].reset(source_chunk.getColumns(), {}); + } + else + { + if (cursors[source_num].has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); + + source_chunk = std::move(chunk); + cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num); + } +} + +void AggregatingSortedTransform::work() +{ + merge(); + prepareOutputChunk(merged_data); + + if (has_output_chunk) + { + size_t num_rows = output_chunk.getNumRows(); + auto columns = output_chunk.detachColumns(); + auto & header = getOutputs().back().getHeader(); + + for (auto & desc : columns_definition.columns_to_simple_aggregate) + { + if (desc.type_to_convert) + { + auto & from_type = header.getByPosition(desc.column_number).type; + auto & to_type = desc.type_to_convert; + columns[desc.column_number] = recursiveTypeConversion(columns[desc.column_number], from_type, to_type); + } + } + + output_chunk.setColumns(std::move(columns), num_rows); + + merged_data.initAggregateDescription(columns_definition); + } +} + +void AggregatingSortedTransform::merge() +{ + /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size` + while (queue.isValid()) + { + bool key_differs; + bool has_previous_group = !last_key.empty(); + + SortCursor current = queue.current(); + + { + detail::RowRef current_key; + current_key.set(current); + + if (!has_previous_group) /// The first key encountered. + key_differs = true; + else + key_differs = !last_key.hasEqualSortColumnsWith(current_key); + + last_key = current_key; + last_chunk_sort_columns.clear(); + } + + if (key_differs) + { + /// if there are enough rows accumulated and the last one is calculated completely + if (merged_data.hasEnoughRows()) + { + /// Write the simple aggregation result for the previous group. + insertSimpleAggregationResult(); + return; + } + + /// We will write the data for the group. We copy the values of ordinary columns. + merged_data.insertRow(current->all_columns, current->pos, + columns_definition.column_numbers_not_to_aggregate); + + /// Add the empty aggregation state to the aggregate columns. The state will be updated in the `addRow` function. + for (auto & column_to_aggregate : columns_definition.columns_to_aggregate) + column_to_aggregate.column->insertDefault(); + + /// Write the simple aggregation result for the previous group. + if (merged_data.mergedRows() > 0) + insertSimpleAggregationResult(); + + /// Reset simple aggregation states for next row + for (auto & desc : columns_definition.columns_to_simple_aggregate) + desc.createState(); + + if (columns_definition.allocates_memory_in_arena) + arena = std::make_unique(); + } + + addRow(current); + + if (!current->isLast()) + { + queue.next(); + } + else + { + /// We get the next block from the corresponding source, if there is one. + queue.removeTop(); + requestDataForInput(current.impl->order); + return; + } + } + + /// Write the simple aggregation result for the previous group. + if (merged_data.mergedRows() > 0) + insertSimpleAggregationResult(); + + last_chunk_sort_columns.clear(); + is_finished = true; +} + +void AggregatingSortedTransform::addRow(SortCursor & cursor) +{ + for (auto & desc : columns_definition.columns_to_aggregate) + desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos); + + for (auto & desc : columns_definition.columns_to_simple_aggregate) + { + auto & col = cursor->all_columns[desc.column_number]; + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get()); + } +} + +void AggregatingSortedTransform::insertSimpleAggregationResult() +{ + for (auto & desc : columns_definition.columns_to_simple_aggregate) + { + desc.function->insertResultInto(desc.state.data(), *desc.column); + desc.destroyState(); + } +} + +} diff --git a/dbms/src/Processors/Merges/AggregatingSortedTransform.h b/dbms/src/Processors/Merges/AggregatingSortedTransform.h new file mode 100644 index 00000000000..613ac0baa58 --- /dev/null +++ b/dbms/src/Processors/Merges/AggregatingSortedTransform.h @@ -0,0 +1,162 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +class ColumnAggregateFunction; + +class AggregatingSortedTransform : public IMergingTransform +{ +public: + AggregatingSortedTransform( + size_t num_inputs, const Block & header, + SortDescription description_, size_t max_block_size); + + struct SimpleAggregateDescription; + + struct ColumnsDefinition + { + struct AggregateDescription + { + ColumnAggregateFunction * column = nullptr; + const size_t column_number = 0; + + AggregateDescription() = default; + explicit AggregateDescription(size_t col_number) : column_number(col_number) {} + }; + + /// Columns with which numbers should not be aggregated. + ColumnNumbers column_numbers_not_to_aggregate; + std::vector columns_to_aggregate; + std::vector columns_to_simple_aggregate; + + /// Does SimpleAggregateFunction allocates memory in arena? + bool allocates_memory_in_arena = false; + }; + + String getName() const override { return "AggregatingSortedTransform"; } + void work() override; + +protected: + void initializeInputs() override; + void consume(Chunk chunk, size_t input_number) override; + +private: + + /// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns. + struct AggregatingMergedData : public MergedData + { + public: + using MergedData::MergedData; + + void insertRow(const ColumnRawPtrs & raw_columns, size_t row, const ColumnNumbers & column_numbers) + { + for (auto column_number :column_numbers) + columns[column_number]->insertFrom(*raw_columns[column_number], row); + + ++total_merged_rows; + ++merged_rows; + /// TODO: sum_blocks_granularity += block_size; + } + + /// Initialize aggregate descriptions with columns. + void initAggregateDescription(ColumnsDefinition & def) + { + for (auto & desc : def.columns_to_simple_aggregate) + desc.column = columns[desc.column_number].get(); + + for (auto & desc : def.columns_to_aggregate) + desc.column = typeid_cast(columns[desc.column_number].get()); + } + }; + + ColumnsDefinition columns_definition; + AggregatingMergedData merged_data; + + SortDescription description; + + /// Chunks currently being merged. + std::vector source_chunks; + SortCursorImpls cursors; + + /// In merging algorithm, we need to compare current sort key with the last one. + /// So, sorting columns for last row needed to be stored. + /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). + Chunk last_chunk; + ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. + + detail::RowRef last_key; + + SortingHeap queue; + bool is_queue_initialized = false; + + /// Memory pool for SimpleAggregateFunction + /// (only when allocates_memory_in_arena == true). + std::unique_ptr arena; + + void merge(); + void updateCursor(Chunk chunk, size_t source_num); + void addRow(SortCursor & cursor); + void insertSimpleAggregationResult(); + +public: + /// Stores information for aggregation of SimpleAggregateFunction columns + struct SimpleAggregateDescription + { + /// An aggregate function 'anyLast', 'sum'... + AggregateFunctionPtr function; + IAggregateFunction::AddFunc add_function = nullptr; + + size_t column_number = 0; + IColumn * column = nullptr; + const DataTypePtr type_to_convert; + + AlignedBuffer state; + bool created = false; + + SimpleAggregateDescription(AggregateFunctionPtr function_, const size_t column_number_, DataTypePtr type) + : function(std::move(function_)), column_number(column_number_), type_to_convert(std::move(type)) + { + add_function = function->getAddressOfAddFunction(); + state.reset(function->sizeOfData(), function->alignOfData()); + } + + void createState() + { + if (created) + return; + function->create(state.data()); + created = true; + } + + void destroyState() + { + if (!created) + return; + function->destroy(state.data()); + created = false; + } + + /// Explicitly destroy aggregation state if the stream is terminated + ~SimpleAggregateDescription() + { + destroyState(); + } + + SimpleAggregateDescription() = default; + SimpleAggregateDescription(SimpleAggregateDescription &&) = default; + SimpleAggregateDescription(const SimpleAggregateDescription &) = delete; + }; +}; + +} diff --git a/dbms/src/Processors/Merges/CollapsingSortedTransform.h b/dbms/src/Processors/Merges/CollapsingSortedTransform.h index 58c97f964bc..46e3fb2e693 100644 --- a/dbms/src/Processors/Merges/CollapsingSortedTransform.h +++ b/dbms/src/Processors/Merges/CollapsingSortedTransform.h @@ -64,7 +64,7 @@ private: SortingHeap queue; bool is_queue_initialized = false; - using RowRef = detail::RowRef; + using RowRef = detail::RowRefWithOwnedChunk; static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current. RowRef first_negative_row; RowRef last_positive_row; diff --git a/dbms/src/Processors/Merges/ReplacingSortedTransform.h b/dbms/src/Processors/Merges/ReplacingSortedTransform.h index 4f4b71c5b13..d28bd239cfe 100644 --- a/dbms/src/Processors/Merges/ReplacingSortedTransform.h +++ b/dbms/src/Processors/Merges/ReplacingSortedTransform.h @@ -13,7 +13,7 @@ namespace DB { -class ReplacingSortedTransform : public IMergingTransform +class ReplacingSortedTransform final : public IMergingTransform { public: ReplacingSortedTransform( @@ -50,7 +50,7 @@ private: SortingHeap queue; bool is_queue_initialized = false; - using RowRef = detail::RowRef; + using RowRef = detail::RowRefWithOwnedChunk; static constexpr size_t max_row_refs = 3; /// last, current, selected. RowRef last_row; /// RowRef next_key; /// Primary key of next row. diff --git a/dbms/src/Processors/Merges/RowRef.h b/dbms/src/Processors/Merges/RowRef.h index 67d32d11277..ac4be79f560 100644 --- a/dbms/src/Processors/Merges/RowRef.h +++ b/dbms/src/Processors/Merges/RowRef.h @@ -103,10 +103,46 @@ inline void intrusive_ptr_release(SharedChunk * ptr) } /// This class represents a row in a chunk. -/// RowRef hold shared pointer to this chunk, possibly extending its life time. +struct RowRef +{ + ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns. + UInt64 row_num = 0; + + bool empty() const { return sort_columns == nullptr; } + void reset() { sort_columns = nullptr; } + + void set(SortCursor & cursor) + { + sort_columns = &cursor.impl->sort_columns; + row_num = cursor.impl->pos; + } + + static bool checkEquals(const ColumnRawPtrs * left, size_t left_row, const ColumnRawPtrs * right, size_t right_row) + { + auto size = left->size(); + for (size_t col_number = 0; col_number < size; ++col_number) + { + auto & cur_column = (*left)[col_number]; + auto & other_column = (*right)[col_number]; + + if (0 != cur_column->compareAt(left_row, right_row, *other_column, 1)) + return false; + } + + return true; + } + + bool hasEqualSortColumnsWith(const RowRef & other) + { + return checkEquals(sort_columns, row_num, other.sort_columns, other.row_num); + } +}; + +/// This class also represents a row in a chunk. +/// RowRefWithOwnedChunk hold shared pointer to this chunk, possibly extending its life time. /// It is needed, for example, in CollapsingTransform, where we need to store first negative row for current sort key. /// We do not copy data itself, because it may be potentially changed for each row. Performance for `set` is important. -struct RowRef +struct RowRefWithOwnedChunk { detail::SharedChunkPtr owned_chunk = nullptr; @@ -114,7 +150,7 @@ struct RowRef ColumnRawPtrs * sort_columns = nullptr; UInt64 row_num = 0; - void swap(RowRef & other) + void swap(RowRefWithOwnedChunk & other) { owned_chunk.swap(other.owned_chunk); std::swap(all_columns, other.all_columns); @@ -140,19 +176,9 @@ struct RowRef sort_columns = &owned_chunk->sort_columns; } - bool hasEqualSortColumnsWith(const RowRef & other) + bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other) { - auto size = sort_columns->size(); - for (size_t col_number = 0; col_number < size; ++col_number) - { - auto & cur_column = (*sort_columns)[col_number]; - auto & other_column = (*other.sort_columns)[col_number]; - - if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1)) - return false; - } - - return true; + return RowRef::checkEquals(sort_columns, row_num, other.sort_columns, other.row_num); } }; diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.cpp b/dbms/src/Processors/Merges/SummingSortedTransform.cpp index 0741dc1cd10..99008025232 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/SummingSortedTransform.cpp @@ -168,7 +168,7 @@ namespace std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) { // Create aggregator to sum this column - detail::AggregateDescription desc; + SummingSortedTransform::AggregateDescription desc; desc.is_agg_func_type = is_agg_func; desc.column_numbers = {i}; @@ -211,7 +211,7 @@ namespace } DataTypes argument_types; - detail::AggregateDescription desc; + SummingSortedTransform::AggregateDescription desc; SummingSortedTransform::MapDescription map_desc; column_num_it = map.second.begin(); diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.h b/dbms/src/Processors/Merges/SummingSortedTransform.h index 678ff6587a7..e7915cd3c8c 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.h +++ b/dbms/src/Processors/Merges/SummingSortedTransform.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -13,8 +14,100 @@ namespace DB { -namespace detail +class SummingSortedTransform final : public IMergingTransform { +public: + + SummingSortedTransform( + size_t num_inputs, const Block & header, + SortDescription description_, + /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. + const Names & column_names_to_sum, + size_t max_block_size); + + struct AggregateDescription; + + /// Stores numbers of key-columns and value-columns. + struct MapDescription + { + std::vector key_col_nums; + std::vector val_col_nums; + }; + + struct ColumnsDefinition + { + /// Columns with which values should be summed. + ColumnNumbers column_numbers_not_to_aggregate; + /// Columns which should be aggregated. + std::vector columns_to_aggregate; + /// Mapping for nested columns. + std::vector maps_to_sum; + + size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } + }; + + /// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns. + struct SummingMergedData : public MergedData + { + public: + using MergedData::MergedData; + + void insertRow(const Row & row, const ColumnNumbers & column_numbers) + { + for (auto column_number :column_numbers) + columns[column_number]->insert(row[column_number]); + + ++total_merged_rows; + ++merged_rows; + /// TODO: sum_blocks_granularity += block_size; + } + + /// Initialize aggregate descriptions with columns. + void initAggregateDescription(std::vector & columns_to_aggregate) + { + size_t num_columns = columns_to_aggregate.size(); + for (size_t column_number = 0; column_number < num_columns; ++column_number) + columns_to_aggregate[column_number].merged_column = columns[column_number].get(); + } + }; + + String getName() const override { return "SummingSortedTransform"; } + void work() override; + +protected: + void initializeInputs() override; + void consume(Chunk chunk, size_t input_number) override; + +private: + Row current_row; + bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. + + ColumnsDefinition columns_definition; + SummingMergedData merged_data; + + SortDescription description; + + /// Chunks currently being merged. + std::vector source_chunks; + SortCursorImpls cursors; + + /// In merging algorithm, we need to compare current sort key with the last one. + /// So, sorting columns for last row needed to be stored. + /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). + Chunk last_chunk; + ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. + + detail::RowRef last_key; + + SortingHeap queue; + bool is_queue_initialized = false; + + void merge(); + void updateCursor(Chunk chunk, size_t source_num); + void addRow(SortCursor & cursor); + void insertCurrentRowIfNeeded(); + +public: /// Stores aggregation function, state, and columns to be used as function arguments. struct AggregateDescription { @@ -66,128 +159,6 @@ namespace detail AggregateDescription(AggregateDescription &&) = default; AggregateDescription(const AggregateDescription &) = delete; }; - - /// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns. - struct SummingMergedData : public MergedData - { - public: - using MergedData::MergedData; - - void insertRow(const Row & row, const ColumnNumbers & column_numbers) - { - for (auto column_number :column_numbers) - columns[column_number]->insert(row[column_number]); - - ++total_merged_rows; - ++merged_rows; - /// TODO: sum_blocks_granularity += block_size; - } - - /// Initialize aggregate descriptions with columns. - void initAggregateDescription(std::vector & columns_to_aggregate) - { - size_t num_columns = columns_to_aggregate.size(); - for (size_t column_number = 0; column_number < num_columns; ++column_number) - columns_to_aggregate[column_number].merged_column = columns[column_number].get(); - } - }; - - struct RowRef - { - ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns. - UInt64 row_num = 0; - - bool empty() const { return sort_columns == nullptr; } - void reset() { sort_columns = nullptr; } - - void set(SortCursor & cursor) - { - sort_columns = &cursor.impl->sort_columns; - row_num = cursor.impl->pos; - } - - bool hasEqualSortColumnsWith(const RowRef & other) - { - auto size = sort_columns->size(); - for (size_t col_number = 0; col_number < size; ++col_number) - { - auto & cur_column = (*sort_columns)[col_number]; - auto & other_column = (*other.sort_columns)[col_number]; - - if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1)) - return false; - } - - return true; - } - }; -} - -class SummingSortedTransform : public IMergingTransform -{ -public: - - SummingSortedTransform( - size_t num_inputs, const Block & header, - SortDescription description_, - /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. - const Names & column_names_to_sum, - size_t max_block_size); - - /// Stores numbers of key-columns and value-columns. - struct MapDescription - { - std::vector key_col_nums; - std::vector val_col_nums; - }; - - struct ColumnsDefinition - { - /// Columns with which values should be summed. - ColumnNumbers column_numbers_not_to_aggregate; - /// Columns which should be aggregated. - std::vector columns_to_aggregate; - /// Mapping for nested columns. - std::vector maps_to_sum; - - size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } - }; - - String getName() const override { return "SummingSortedTransform"; } - void work() override; - -protected: - void initializeInputs() override; - void consume(Chunk chunk, size_t input_number) override; - -private: - Row current_row; - bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. - - ColumnsDefinition columns_definition; - detail::SummingMergedData merged_data; - - SortDescription description; - - /// Chunks currently being merged. - std::vector source_chunks; - SortCursorImpls cursors; - - /// In merging algorithm, we need to compare current sort key with the last one. - /// So, sorting columns for last row needed to be stored. - /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). - Chunk last_chunk; - ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. - - detail::RowRef last_key; - - SortingHeap queue; - bool is_queue_initialized = false; - - void merge(); - void updateCursor(Chunk chunk, size_t source_num); - void addRow(SortCursor & cursor); - void insertCurrentRowIfNeeded(); }; } diff --git a/dbms/src/Processors/Merges/VersionedCollapsingTransform.h b/dbms/src/Processors/Merges/VersionedCollapsingTransform.h index 722bd30feca..0dbdf8e2a40 100644 --- a/dbms/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/dbms/src/Processors/Merges/VersionedCollapsingTransform.h @@ -15,7 +15,7 @@ namespace DB { -class VersionedCollapsingTransform : public IMergingTransform +class VersionedCollapsingTransform final : public IMergingTransform { public: /// Don't need version column. It's in primary key. @@ -53,7 +53,7 @@ private: SortingHeap queue; bool is_queue_initialized = false; - using RowRef = detail::RowRef; + using RowRef = detail::RowRefWithOwnedChunk; const size_t max_rows_in_queue; /// Rows with the same primary key and sign. FixedSizeDequeWithGaps current_keys;