diff --git a/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp b/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp index 9fcb7cea116..009aed0983f 100644 --- a/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp @@ -29,6 +29,8 @@ CollapsingSortedTransform::CollapsingSortedTransform( , description(std::move(description_)) , sign_column_number(header.getPositionByName(sign_column)) , out_row_sources_buf(out_row_sources_buf_) + , source_chunks(num_inputs) + , cursors(num_inputs) , chunk_allocator(num_inputs + max_row_refs) { } diff --git a/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp b/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp index 65654a98764..e39b33a5a46 100644 --- a/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp @@ -19,6 +19,8 @@ ReplacingSortedTransform::ReplacingSortedTransform( , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , out_row_sources_buf(out_row_sources_buf_) + , source_chunks(num_inputs) + , cursors(num_inputs) , chunk_allocator(num_inputs + max_row_refs) { if (!version_column.empty()) diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.cpp b/dbms/src/Processors/Merges/SummingSortedTransform.cpp index 7ae127c2314..87bf533d5c5 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/SummingSortedTransform.cpp @@ -1,14 +1,23 @@ #include + #include #include -#include -#include -#include +#include #include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int CORRUPTED_DATA; +} + namespace { bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) @@ -159,7 +168,7 @@ namespace std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) { // Create aggregator to sum this column - SummingSortedTransform::AggregateDescription desc; + detail::AggregateDescription desc; desc.is_agg_func_type = is_agg_func; desc.column_numbers = {i}; @@ -202,7 +211,7 @@ namespace } DataTypes argument_types; - SummingSortedTransform::AggregateDescription desc; + detail::AggregateDescription desc; SummingSortedTransform::MapDescription map_desc; column_num_it = map.second.begin(); @@ -323,20 +332,52 @@ namespace chunk.setColumns(std::move(res_columns), num_rows); } + + void setRow(Row & row, SortCursor & cursor, const Block & header) + { + size_t num_columns = row.size(); + for (size_t i = 0; i < num_columns; ++i) + { + try + { + cursor->all_columns[i]->get(cursor->pos, row[i]); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + + /// Find out the name of the column and throw more informative exception. + + String column_name; + if (i < header.columns()) + { + column_name = header.safeGetByPosition(i).name; + break; + } + + throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos) + + " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"), + ErrorCodes::CORRUPTED_DATA); + } + } + } } SummingSortedTransform::SummingSortedTransform( - size_t num_inputs, const Block & header, - SortDescription description_, - /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. - const Names & column_names_to_sum, - size_t max_block_size) - : IMergingTransform(num_inputs, header, header, true) - , columns_definition(defineColumns(header, description_, column_names_to_sum)) - , merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size) + size_t num_inputs, const Block & header, + SortDescription description_, + /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. + const Names & column_names_to_sum, + size_t max_block_size) + : IMergingTransform(num_inputs, header, header, true) + , columns_definition(defineColumns(header, description_, column_names_to_sum)) + , merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size) + , description(std::move(description_)) + , source_chunks(num_inputs) + , cursors(num_inputs) { - size_t num_columns = header.columns(); - current_row.resize(num_columns); + current_row.resize(header.columns()); + merged_data.initAggregateDescription(columns_definition.columns_to_aggregate); } void SummingSortedTransform::initializeInputs() @@ -389,7 +430,103 @@ void SummingSortedTransform::work() prepareOutputChunk(merged_data); if (has_output_chunk) + { finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition); + merged_data.initAggregateDescription(columns_definition.columns_to_aggregate); + } +} + +void SummingSortedTransform::insertCurrentRowIfNeeded() +{ + /// We have nothing to aggregate. It means that it could be non-zero, because we have columns_not_to_aggregate. + if (columns_definition.columns_to_aggregate.empty()) + current_row_is_zero = false; + + for (auto & desc : columns_definition.columns_to_aggregate) + { + // Do not insert if the aggregation state hasn't been created + if (desc.created) + { + if (desc.is_agg_func_type) + { + current_row_is_zero = false; + } + else + { + try + { + desc.function->insertResultInto(desc.state.data(), *desc.merged_column); + + /// Update zero status of current row + if (desc.column_numbers.size() == 1) + { + // Flag row as non-empty if at least one column number if non-zero + current_row_is_zero = current_row_is_zero && desc.merged_column->isDefaultAt(desc.merged_column->size() - 1); + } + else + { + /// It is sumMapWithOverflow aggregate function. + /// Assume that the row isn't empty in this case (just because it is compatible with previous version) + current_row_is_zero = false; + } + } + catch (...) + { + desc.destroyState(); + throw; + } + } + desc.destroyState(); + } + else + desc.merged_column->insertDefault(); + } + + /// If it is "zero" row, then rollback the insertion + /// (at this moment we need rollback only cols from columns_to_aggregate) + if (current_row_is_zero) + { + for (auto & desc : columns_definition.columns_to_aggregate) + desc.merged_column->popBack(1); + + return; + } + + merged_data.insertRow(current_row, columns_definition.column_numbers_not_to_aggregate); +} + +void SummingSortedTransform::addRow(SortCursor & cursor) +{ + for (auto & desc : columns_definition.columns_to_aggregate) + { + if (!desc.created) + throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR); + + if (desc.is_agg_func_type) + { + // desc.state is not used for AggregateFunction types + auto & col = cursor->all_columns[desc.column_numbers[0]]; + assert_cast(*desc.merged_column).insertMergeFrom(*col, cursor->pos); + } + else + { + // Specialized case for unary functions + if (desc.column_numbers.size() == 1) + { + auto & col = cursor->all_columns[desc.column_numbers[0]]; + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr); + } + else + { + // Gather all source columns into a vector + ColumnRawPtrs columns(desc.column_numbers.size()); + for (size_t i = 0; i < desc.column_numbers.size(); ++i) + columns[i] = cursor->all_columns[desc.column_numbers[i]]; + + desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr); + } + } + } } void SummingSortedTransform::merge() @@ -403,7 +540,7 @@ void SummingSortedTransform::merge() SortCursor current = queue.current(); { - RowRef current_key; + detail::RowRef current_key; current_key.set(current); if (!has_previous_group) /// The first key encountered. @@ -431,7 +568,7 @@ void SummingSortedTransform::merge() return; } - setRow(current_row, current); + setRow(current_row, current, getInputs().front().getHeader()); /// Reset aggregation states for next row for (auto & desc : columns_definition.columns_to_aggregate) diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.h b/dbms/src/Processors/Merges/SummingSortedTransform.h index 662cc65d95d..678ff6587a7 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.h +++ b/dbms/src/Processors/Merges/SummingSortedTransform.h @@ -13,20 +13,9 @@ namespace DB { - - -class SummingSortedTransform : public IMergingTransform +namespace detail { -public: - - SummingSortedTransform( - size_t num_inputs, const Block & header, - SortDescription description_, - /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. - const Names & column_names_to_sum, - size_t max_block_size); - - /// Stores aggregation function, state, and columns to be used as function arguments + /// Stores aggregation function, state, and columns to be used as function arguments. struct AggregateDescription { /// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing. @@ -78,57 +67,31 @@ public: AggregateDescription(const AggregateDescription &) = delete; }; + /// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns. struct SummingMergedData : public MergedData { public: using MergedData::MergedData; + + void insertRow(const Row & row, const ColumnNumbers & column_numbers) + { + for (auto column_number :column_numbers) + columns[column_number]->insert(row[column_number]); + + ++total_merged_rows; + ++merged_rows; + /// TODO: sum_blocks_granularity += block_size; + } + + /// Initialize aggregate descriptions with columns. + void initAggregateDescription(std::vector & columns_to_aggregate) + { + size_t num_columns = columns_to_aggregate.size(); + for (size_t column_number = 0; column_number < num_columns; ++column_number) + columns_to_aggregate[column_number].merged_column = columns[column_number].get(); + } }; - /// Stores numbers of key-columns and value-columns. - struct MapDescription - { - std::vector key_col_nums; - std::vector val_col_nums; - }; - - struct ColumnsDefinition - { - /// Columns with which values should be summed. - ColumnNumbers column_numbers_not_to_aggregate; - /// Columns which should be aggregated. - std::vector columns_to_aggregate; - /// Mapping for nested columns. - std::vector maps_to_sum; - - size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } - }; - - String getName() const override { return "SummingSortedTransform"; } - void work() override; - -protected: - void initializeInputs() override; - void consume(Chunk chunk, size_t input_number) override; - -private: - Row current_row; - bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. - - ColumnsDefinition columns_definition; - SummingMergedData merged_data; - - SortDescription description; - - /// Chunks currently being merged. - std::vector source_chunks; - SortCursorImpls cursors; - - /// In merging algorithm, we need to compare current sort key with the last one. - /// So, sorting columns for last row needed to be stored. - /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). - Chunk last_chunk; - ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. - struct RowRef { ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns. @@ -158,15 +121,73 @@ private: return true; } }; +} - RowRef last_key; +class SummingSortedTransform : public IMergingTransform +{ +public: + + SummingSortedTransform( + size_t num_inputs, const Block & header, + SortDescription description_, + /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. + const Names & column_names_to_sum, + size_t max_block_size); + + /// Stores numbers of key-columns and value-columns. + struct MapDescription + { + std::vector key_col_nums; + std::vector val_col_nums; + }; + + struct ColumnsDefinition + { + /// Columns with which values should be summed. + ColumnNumbers column_numbers_not_to_aggregate; + /// Columns which should be aggregated. + std::vector columns_to_aggregate; + /// Mapping for nested columns. + std::vector maps_to_sum; + + size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } + }; + + String getName() const override { return "SummingSortedTransform"; } + void work() override; + +protected: + void initializeInputs() override; + void consume(Chunk chunk, size_t input_number) override; + +private: + Row current_row; + bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. + + ColumnsDefinition columns_definition; + detail::SummingMergedData merged_data; + + SortDescription description; + + /// Chunks currently being merged. + std::vector source_chunks; + SortCursorImpls cursors; + + /// In merging algorithm, we need to compare current sort key with the last one. + /// So, sorting columns for last row needed to be stored. + /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). + Chunk last_chunk; + ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. + + detail::RowRef last_key; SortingHeap queue; bool is_queue_initialized = false; - void insertRow(); void merge(); void updateCursor(Chunk chunk, size_t source_num); + void addRow(SortCursor & cursor); + void insertCurrentRowIfNeeded(); }; } diff --git a/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp b/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp index 4042c146724..8b8b2bfa063 100644 --- a/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp +++ b/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp @@ -17,6 +17,8 @@ VersionedCollapsingTransform::VersionedCollapsingTransform( , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , out_row_sources_buf(out_row_sources_buf_) + , source_chunks(num_inputs) + , cursors(num_inputs) , max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 1) /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer , current_keys(max_rows_in_queue) , chunk_allocator(num_inputs + max_rows_in_queue + 1) /// +1 just in case (for current_row)