diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 0f27cfdb2ca..7b431e206e9 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -12,15 +12,46 @@ namespace ErrorCodes } +AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) +{ + /// Fill in the column numbers that need to be aggregated. + for (size_t i = 0; i < num_columns; ++i) + { + ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// We leave only states of aggregate functions. + if (!startsWith(column.type->getName(), "AggregateFunction")) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Included into PK? + SortDescription::const_iterator it = description.begin(); + for (; it != description.end(); ++it) + if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) + break; + + if (it != description.end()) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + column_numbers_to_aggregate.push_back(i); + } +} + + Block AggregatingSortedBlockInputStream::readImpl() { if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -28,37 +59,6 @@ Block AggregatingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (next_key.empty()) - { - /// Fill in the column numbers that need to be aggregated. - for (size_t i = 0; i < num_columns; ++i) - { - ColumnWithTypeAndName & column = header.safeGetByPosition(i); - - /// We leave only states of aggregate functions. - if (!startsWith(column.type->getName(), "AggregateFunction")) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - /// Included into PK? - SortDescription::const_iterator it = description.begin(); - for (; it != description.end(); ++it) - if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) - break; - - if (it != description.end()) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - column_numbers_to_aggregate.push_back(i); - } - } - columns_to_aggregate.resize(column_numbers_to_aggregate.size()); for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i) columns_to_aggregate[i] = typeid_cast(merged_columns[column_numbers_to_aggregate[i]].get()); diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index e428b3b7e20..5047158aa2d 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -21,10 +21,8 @@ namespace DB class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - AggregatingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) - { - } + AggregatingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_); String getName() const override { return "AggregatingSorted"; } diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 9b70bd6b89a..01127b5029b 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -108,10 +108,8 @@ Block CollapsingSortedBlockInputStream::readImpl() if (finished) return {}; - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -119,11 +117,6 @@ Block CollapsingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (first_negative.empty()) - sign_column_number = header.getPositionByName(sign_column); - - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 7280dda02b1..e8650b4efc5 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -25,10 +25,10 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream public: CollapsingSortedBlockInputStream( BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) + const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) - , sign_column(sign_column_) { + sign_column_number = header.getPositionByName(sign_column); } String getName() const override { return "CollapsingSorted"; } @@ -38,8 +38,7 @@ protected: Block readImpl() override; private: - String sign_column; - size_t sign_column_number = 0; + size_t sign_column_number; Logger * log = &Logger::get("CollapsingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 5da53d8eea5..0a2273d45a9 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -12,6 +12,31 @@ namespace ErrorCodes } +GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + const Graphite::Params & params, time_t time_of_merge) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + params(params), time_of_merge(time_of_merge) +{ + size_t max_size_of_aggregate_state = 0; + for (const auto & pattern : params.patterns) + if (pattern.function->sizeOfData() > max_size_of_aggregate_state) + max_size_of_aggregate_state = pattern.function->sizeOfData(); + + place_for_aggregate_state.resize(max_size_of_aggregate_state); + + /// Memoize column numbers in block. + path_column_num = header.getPositionByName(params.path_column_name); + time_column_num = header.getPositionByName(params.time_column_name); + value_column_num = header.getPositionByName(params.value_column_name); + version_column_num = header.getPositionByName(params.version_column_name); + + for (size_t i = 0; i < num_columns; ++i) + if (i != time_column_num && i != value_column_num && i != version_column_num) + unmodified_column_numbers.push_back(i); +} + + const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const { for (const auto & pattern : params.patterns) @@ -68,10 +93,8 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -79,27 +102,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (is_first) - { - size_t max_size_of_aggregate_state = 0; - for (const auto & pattern : params.patterns) - if (pattern.function->sizeOfData() > max_size_of_aggregate_state) - max_size_of_aggregate_state = pattern.function->sizeOfData(); - - place_for_aggregate_state.resize(max_size_of_aggregate_state); - - /// Memoize column numbers in block. - path_column_num = header.getPositionByName(params.path_column_name); - time_column_num = header.getPositionByName(params.time_column_name); - value_column_num = header.getPositionByName(params.value_column_name); - version_column_num = header.getPositionByName(params.version_column_name); - - for (size_t i = 0; i < num_columns; ++i) - if (i != time_column_num && i != value_column_num && i != version_column_num) - unmodified_column_numbers.push_back(i); - } - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index c256d27064d..15dfe7c0f4d 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -126,12 +126,8 @@ class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStrea { public: GraphiteRollupSortedBlockInputStream( - BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, - const Graphite::Params & params, time_t time_of_merge) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), - params(params), time_of_merge(time_of_merge) - { - } + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + const Graphite::Params & params, time_t time_of_merge); String getName() const override { return "GraphiteRollupSorted"; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 628de41b32e..abfcdc89698 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -63,14 +63,21 @@ static void enrichBlockWithConstants(Block & block, const Block & header) } +MergeSortingBlockInputStream::MergeSortingBlockInputStream( + const BlockInputStreamPtr & input, SortDescription & description_, + size_t max_merged_block_size_, size_t limit_, + size_t max_bytes_before_external_sort_, const std::string & tmp_path_) + : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), + max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) +{ + children.push_back(input); + header = getHeader(); + removeConstantsFromSortDescription(header, description); +} + + Block MergeSortingBlockInputStream::readImpl() { - if (!header) - { - header = getHeader(); - removeConstantsFromSortDescription(header, description); - } - /** Algorithm: * - read to memory blocks from source stream; * - if too many of them and if external sorting is enabled, diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 416dc0ecce7..498837f3bff 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -73,12 +73,7 @@ public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, size_t limit_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_) - : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), - max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) - { - children.push_back(input); - } + size_t max_bytes_before_external_sort_, const std::string & tmp_path_); String getName() const override { return "MergeSorting"; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index be90a00e4b9..62b32330679 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -15,15 +15,17 @@ namespace ErrorCodes MergingSortedBlockInputStream::MergingSortedBlockInputStream( - BlockInputStreams & inputs_, const SortDescription & description_, - size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) + const BlockInputStreams & inputs_, const SortDescription & description_, + size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_) , source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); + header = children.at(0)->getHeader(); + num_columns = header.columns(); } -void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns) +void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) { /// Read the first blocks, initialize the queue. if (first) @@ -44,9 +46,6 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged if (rows == 0) continue; - if (!num_columns) - num_columns = shared_block_ptr->columns(); - if (expected_block_size < rows) expected_block_size = std::min(rows, max_block_size); @@ -62,32 +61,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged initQueue(queue); } - /// Initialize the result. - - /// We clone the structure of the first non-empty source block. - { - auto it = source_blocks.cbegin(); - for (; it != source_blocks.cend(); ++it) - { - const SharedBlockPtr & shared_block_ptr = *it; - - if (*shared_block_ptr) - { - header = shared_block_ptr->cloneEmpty(); - break; - } - } - - /// If all the input blocks are empty. - if (it == source_blocks.cend()) - return; - } - /// Let's check that all source blocks have the same structure. - for (auto it = source_blocks.cbegin(); it != source_blocks.cend(); ++it) + for (const SharedBlockPtr & shared_block_ptr : source_blocks) { - const SharedBlockPtr & shared_block_ptr = *it; - if (!*shared_block_ptr) continue; @@ -120,10 +96,9 @@ Block MergingSortedBlockInputStream::readImpl() if (children.size() == 1) return children[0]->read(); - Block header; MutableColumns merged_columns; - init(header, merged_columns); + init(merged_columns); if (merged_columns.empty()) return {}; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 6391f52dcd5..825a9e1fcc3 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -65,8 +65,8 @@ public: * quiet - don't log profiling info */ MergingSortedBlockInputStream( - BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, - size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); String getName() const override { return "MergingSorted"; } @@ -74,7 +74,7 @@ public: bool isSortedOutput() const override { return true; } const SortDescription & getSortDescription() const override { return description; } - Block getHeader() const override { return children.at(0)->getHeader(); } + Block getHeader() const override { return header; } protected: struct RowRef @@ -120,14 +120,16 @@ protected: void readSuffixImpl() override; - /// Initializes the queue and the next result block. - void init(Block & header, MutableColumns & merged_columns); + /// Initializes the queue and the columns of next result block. + void init(MutableColumns & merged_columns); /// Gets the next block from the source corresponding to the `current`. template void fetchNextBlock(const TSortCursor & current, std::priority_queue & queue); + Block header; + const SortDescription description; const size_t max_block_size; size_t limit; diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index 553e3a01e4b..8fcfdfe2d58 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -35,10 +35,8 @@ Block ReplacingSortedBlockInputStream::readImpl() if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -46,13 +44,6 @@ Block ReplacingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (selected_row.empty()) - { - if (!version_column.empty()) - version_column_number = header.getPositionByName(version_column); - } - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index b8592a0e5b6..d0a7594c69a 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -15,11 +15,13 @@ namespace DB class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_), - version_column(version_column_) + ReplacingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) { + if (!version_column.empty()) + version_column_number = header.getPositionByName(version_column); } String getName() const override { return "ReplacingSorted"; } @@ -29,7 +31,6 @@ protected: Block readImpl() override; private: - String version_column; ssize_t version_column_number = -1; Logger * log = &Logger::get("ReplacingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index e79366ca02d..e914b8f8b65 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -24,6 +24,168 @@ namespace ErrorCodes } +namespace +{ + bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) + { + for (auto & desc : description) + if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) + return true; + + return false; + } +} + + +SummingSortedBlockInputStream::SummingSortedBlockInputStream( + const BlockInputStreams & inputs_, + const SortDescription & description_, + /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. + const Names & column_names_to_sum, + size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) +{ + current_row.resize(num_columns); + + /// name of nested structure -> the column numbers that refer to it. + std::unordered_map> discovered_maps; + + /** Fill in the column numbers, which must be summed. + * This can only be numeric columns that are not part of the sort key. + * If a non-empty column_names_to_sum is specified, then we only take these columns. + * Some columns from column_names_to_sum may not be found. This is ignored. + */ + for (size_t i = 0; i < num_columns; ++i) + { + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// Discover nested Maps and find columns for summation + if (typeid_cast(column.type.get())) + { + const auto map_name = Nested::extractTableName(column.name); + /// if nested table name ends with `Map` it is a possible candidate for special handling + if (map_name == column.name || !endsWith(map_name, "Map")) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + discovered_maps[map_name].emplace_back(i); + } + else + { + if (!column.type->isSummable()) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Are they inside the PK? + if (isInPrimaryKey(description, column.name, i)) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + if (column_names_to_sum.empty() + || column_names_to_sum.end() != + std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) + { + // Create aggregator to sum this column + AggregateDescription desc; + desc.column_numbers = {i}; + desc.init("sumWithOverflow", {column.type}); + columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Column is not going to be summed, use last value + column_numbers_not_to_aggregate.push_back(i); + } + } + } + + /// select actual nested Maps from list of candidates + for (const auto & map : discovered_maps) + { + /// map should contain at least two elements (key -> value) + if (map.second.size() < 2) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + /// no elements of map could be in primary key + auto column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) + break; + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + DataTypes argument_types; + AggregateDescription desc; + MapDescription map_desc; + + column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + { + const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); + const String & name = key_col.name; + const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); + + if (column_num_it == map.second.begin() + || endsWith(name, "ID") + || endsWith(name, "Key") + || endsWith(name, "Type")) + { + if (!nested_type.isValueRepresentedByInteger()) + break; + + map_desc.key_col_nums.push_back(*column_num_it); + } + else + { + if (!nested_type.isSummable()) + break; + + map_desc.val_col_nums.push_back(*column_num_it); + } + + // Add column to function arguments + desc.column_numbers.push_back(*column_num_it); + argument_types.push_back(key_col.type); + } + + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + if (map_desc.key_col_nums.size() == 1) + { + // Create summation for all value columns in the map + desc.init("sumMap", argument_types); + columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Fall back to legacy mergeMaps for composite keys + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + maps_to_sum.emplace_back(std::move(map_desc)); + } + } +} + + void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion) { for (auto & desc : columns_to_aggregate) @@ -78,28 +240,13 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me } -namespace -{ - bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) - { - for (auto & desc : description) - if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) - return true; - - return false; - } -} - - Block SummingSortedBlockInputStream::readImpl() { if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -107,150 +254,7 @@ Block SummingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (current_row.empty()) - { - current_row.resize(num_columns); - - /// name of nested structure -> the column numbers that refer to it. - std::unordered_map> discovered_maps; - - /** Fill in the column numbers, which must be summed. - * This can only be numeric columns that are not part of the sort key. - * If a non-empty column_names_to_sum is specified, then we only take these columns. - * Some columns from column_names_to_sum may not be found. This is ignored. - */ - for (size_t i = 0; i < num_columns; ++i) - { - const ColumnWithTypeAndName & column = header.safeGetByPosition(i); - - /// Discover nested Maps and find columns for summation - if (typeid_cast(column.type.get())) - { - const auto map_name = Nested::extractTableName(column.name); - /// if nested table name ends with `Map` it is a possible candidate for special handling - if (map_name == column.name || !endsWith(map_name, "Map")) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - discovered_maps[map_name].emplace_back(i); - } - else - { - if (!column.type->isSummable()) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - /// Are they inside the PK? - if (isInPrimaryKey(description, column.name, i)) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - if (column_names_to_sum.empty() - || column_names_to_sum.end() != - std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) - { - // Create aggregator to sum this column - AggregateDescription desc; - desc.column_numbers = {i}; - desc.init("sumWithOverflow", {column.type}); - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Column is not going to be summed, use last value - column_numbers_not_to_aggregate.push_back(i); - } - } - } - - /// select actual nested Maps from list of candidates - for (const auto & map : discovered_maps) - { - /// map should contain at least two elements (key -> value) - if (map.second.size() < 2) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - /// no elements of map could be in primary key - auto column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) - break; - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - DataTypes argument_types = {}; - AggregateDescription desc; - MapDescription map_desc; - - column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - { - const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); - const String & name = key_col.name; - const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); - - if (column_num_it == map.second.begin() - || endsWith(name, "ID") - || endsWith(name, "Key") - || endsWith(name, "Type")) - { - if (!nested_type.isValueRepresentedByInteger()) - break; - - map_desc.key_col_nums.push_back(*column_num_it); - } - else - { - if (!nested_type.isSummable()) - break; - - map_desc.val_col_nums.push_back(*column_num_it); - } - - // Add column to function arguments - desc.column_numbers.push_back(*column_num_it); - argument_types.push_back(key_col.type); - } - - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - if (map_desc.key_col_nums.size() == 1) - { - // Create summation for all value columns in the map - desc.init("sumMap", argument_types); - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Fall back to legacy mergeMaps for composite keys - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - maps_to_sum.emplace_back(std::move(map_desc)); - } - } - } - - // Update aggregation result columns for current block + /// Update aggregation result columns for current block for (auto & desc : columns_to_aggregate) { // Wrap aggregated columns in a tuple to match function signature diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 62df3863fc6..78b61903d01 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -24,14 +24,12 @@ namespace ErrorCodes class SummingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - SummingSortedBlockInputStream(BlockInputStreams inputs_, + SummingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum_, - size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), column_names_to_sum(column_names_to_sum_) - { - } + size_t max_block_size_); String getName() const override { return "SummingSorted"; } @@ -46,7 +44,6 @@ private: bool finished = false; /// Columns with which values should be summed. - Names column_names_to_sum; /// If set, it is converted to column_numbers_to_aggregate when initialized. ColumnNumbers column_numbers_not_to_aggregate; /** A table can have nested tables that are treated in a special way. diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index 45c529470c0..071752137c6 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -2,6 +2,7 @@ #include #include + namespace DB { @@ -11,6 +12,20 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } + +VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, + WriteBuffer * out_row_sources_buf_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) + , max_rows_in_queue(std::min(std::max(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) + , current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_) +{ + sign_column_number = header.getPositionByName(sign_column_); +} + + + inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) { if constexpr (sizeof(RowSourcePart) == 1) @@ -52,12 +67,8 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() if (finished) return {}; - Block header; MutableColumns merged_columns; - - bool is_initialized = !first; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED); @@ -65,11 +76,6 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (!is_initialized) - sign_column_number = header.getPositionByName(sign_column); - - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index 1c299e78e81..636ee5e3833 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -6,6 +6,7 @@ #include + namespace DB { @@ -16,6 +17,7 @@ namespace ErrorCodes static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192; + /* Deque with fixed memory size. Allows pushing gaps. * frontGap() returns the number of gaps were inserted before front. * @@ -173,15 +175,9 @@ public: /// Don't need version column. It's in primary key. /// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr). VersionedCollapsingSortedBlockInputStream( - BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, - WriteBuffer * out_row_sources_buf_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) - , sign_column(sign_column_) - , max_rows_in_queue(std::min(std::max(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) - , current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_) - { - } + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, + WriteBuffer * out_row_sources_buf_ = nullptr); String getName() const override { return "VersionedCollapsingSorted"; } @@ -190,8 +186,6 @@ protected: Block readImpl() override; private: - String sign_column; - size_t sign_column_number = 0; Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");