diff --git a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h index a70665eb440..f48572cfb9d 100644 --- a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include namespace DB @@ -60,6 +62,14 @@ private: Names column_names_to_sum; /// Если задано - преобразуется в column_numbers_to_sum при инициализации. ColumnNumbers column_numbers_to_sum; + struct map_description + { + std::size_t key_col_num; + std::size_t val_col_num; + }; + + std::vector maps_to_sum; + Row current_key; /// Текущий первичный ключ. Row next_key; /// Первичный ключ следующей строки. @@ -97,12 +107,96 @@ private: bool operator() (Array & x) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); } }; + using map_merge_t = std::map; + class MapSumVisitor : public StaticVisitor + { + public: + map_merge_t & map; + const Field & key; + + public: + MapSumVisitor(map_merge_t & map, const Field & key) : map(map), key(key) {} + + void operator()(const UInt64 val) const + { + const auto it = map.find(key); + if (it == std::end(map)) + map.emplace(key, Field{val}); + else + it->second.get() += val; + } + + void operator()(const Int64 val) const + { + const auto it = map.find(key); + if (it == std::end(map)) + map.emplace(key, Field{val}); + else + it->second.get() += val; + } + + void operator()(const Float64 val) const + { + const auto it = map.find(key); + if (it == std::end(map)) + map.emplace(key, Field{val}); + else + it->second.get() += val; + } + + void operator() (Null) const { throw Exception("Cannot merge Nulls", ErrorCodes::LOGICAL_ERROR); } + void operator() (String) const { throw Exception("Cannot merge Strings", ErrorCodes::LOGICAL_ERROR); } + void operator() (Array) const { throw Exception("Cannot merge Arrays", ErrorCodes::LOGICAL_ERROR); } + }; + /** Прибавить строчку под курсором к row. * Возвращает false, если результат получился нулевым. */ template bool addRow(Row & row, TSortCursor & cursor) { + /// merge maps + for (const auto & map : maps_to_sum) + { + auto & key_array = row[map.key_col_num].get(); + auto & val_array = row[map.val_col_num].get(); + if (key_array.size() != val_array.size()) + throw Exception{"Nested arrays have different sizes", ErrorCodes::LOGICAL_ERROR}; + + const auto key_field_rhs = (*cursor->all_columns[map.key_col_num])[cursor->pos]; + const auto val_field_rhs = (*cursor->all_columns[map.val_col_num])[cursor->pos]; + + const auto & key_array_rhs = key_field.get(); + const auto & val_array_rhs = val_field.get(); + if (key_array_rhs.size() != val_array_rhs.size()) + throw Exception{"Nested arrays have different sizes", ErrorCodes::LOGICAL_ERROR}; + + map_merge_t result; + + /// merge accumulator-row and current row from cursor using std::map + for (const auto i : ext::range(0, key_array.size())) + apply_visitor(MapSumVisitor{result, key_array[i]}, val_array[i]); + + for (const auto i : ext::range(0, key_array_rhs.size())) + apply_visitor(MapSumVisitor{result, key_array_rhs[i]}, val_array_rhs[i]); + + Array key_array_result; + Array val_array_result; + + /// skip items with value == 0 + for (const auto & pair : result) + { + if (!apply_visitor(FieldVisitorAccurateEquals{}, pair.second, nearestFieldType(0))) + { + key_array_result.emplace_back(pair.first); + val_array_result.emplace_back(pair.second); + } + } + + key_array = std::move(key_array_result); + val_array = std::move(val_array_result); + } + bool res = false; /// Есть ли хотя бы одно ненулевое число. for (size_t i = 0, size = column_numbers_to_sum.size(); i < size; ++i) diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 4edac6b5222..d0d846ea57d 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -1,4 +1,6 @@ #include +#include +#include namespace DB @@ -12,6 +14,24 @@ void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_co } +namespace +{ + bool endsWith(const std::string & s, const std::string & suffix) + { + return s.size() >= suffix.size() && 0 == strncmp(s.data() + s.size() - suffix.size(), suffix.data(), suffix.size()); + } + + bool isInPrimaryKey(const SortDescription & description, const std::string & name, const std::size_t number) + { + for (auto & desc : description) + if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) + return true; + + return false; + } +} + + Block SummingSortedBlockInputStream::readImpl() { if (finished) @@ -34,6 +54,7 @@ Block SummingSortedBlockInputStream::readImpl() current_key.resize(description.size()); next_key.resize(description.size()); + std::unordered_map> discovered_maps; /** Заполним номера столбцов, которые должны быть просуммированы. * Это могут быть только числовые столбцы, не входящие в ключ сортировки. * Если задан непустой список column_names_to_sum, то берём только эти столбцы. @@ -43,24 +64,62 @@ Block SummingSortedBlockInputStream::readImpl() { ColumnWithNameAndType & column = merged_block.getByPosition(i); - /// Оставляем только числовые типы. При чём, даты и даты-со-временем здесь такими не считаются. - if (!column.type->isNumeric() || column.type->getName() == "Date" || column.type->getName() == "DateTime") - continue; - - /// Входят ли в PK? - SortDescription::const_iterator it = description.begin(); - for (; it != description.end(); ++it) - if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) - break; - - if (it != description.end()) - continue; - - if (column_names_to_sum.empty() - || column_names_to_sum.end() != std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) + if (const auto array_type = typeid_cast(column.type.get())) { - column_numbers_to_sum.push_back(i); + const auto map_name = DataTypeNested::extractNestedTableName(column.name); + if (map_name == column.name || !endsWith(map_name, "Map")) + continue; + + discovered_maps[map_name].emplace_back(i); } + else + { + /// Оставляем только числовые типы. При чём, даты и даты-со-временем здесь такими не считаются. + if (!column.type->isNumeric() || column.type->getName() == "Date" || + column.type->getName() == "DateTime") + continue; + + /// Входят ли в PK? + if (isInPrimaryKey(description, column.name, i)) + continue; + + if (column_names_to_sum.empty() + || column_names_to_sum.end() != + std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) + { + column_numbers_to_sum.push_back(i); + } + } + } + + for (const auto & map : discovered_maps) + { + /// map can only contain a pair of elements (key -> value) + if (map.second.size() > 2) + continue; + + /// check types of key and value + const auto key_num = map.second.front(); + auto & key_col = merged_block.getByPosition(key_num); + if (isInPrimaryKey(description, key_col.name, key_num)) + continue; + + auto & key_nested_type = static_cast(key_col.type.get())->getNestedType(); + /// key can only be integral + if (!key_nested_type->isNumeric() || key_nested_type->getName() == "Float32" || key_nested_type->getName() == "Float64") + continue; + + const auto value_num = map.second.back(); + auto & value_col = merged_block.getByPosition(value_num); + if (isInPrimaryKey(description, value_col.name, value_num)) + continue; + + auto & value_nested_type = static_cast(value_col.type.get())->getNestedType(); + /// value can be any arithmetic type except date and datetime + if (!value_nested_type->isNumeric() || value_nested_type->getName() == "Date" || value_nested_type->getName() == "DateTime") + continue; + + maps_to_sum.push_back({ key_num, value_num }); } } diff --git a/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.reference b/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.reference new file mode 100644 index 00000000000..165be2210cd --- /dev/null +++ b/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.reference @@ -0,0 +1,4 @@ +[1,2] [100,150] +[1] [250] +[1,2] [250,150] +[2] [150] diff --git a/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.sql b/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.sql new file mode 100644 index 00000000000..45c71c99ab5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00146_summing_merge_tree_nested_map.sql @@ -0,0 +1,10 @@ +drop table if exists test.nested_map; + +create table test.nested_map (d default today(), k UInt64, payload default rand(), SomeMap Nested(ID UInt32, Num Int64)) engine=SummingMergeTree(d, k, 8192); + +insert into test.nested_map (k, `SomeMap.ID`, `SomeMap.Num`) values (0,[1],[100]),(1,[1],[100]),(2,[1],[100]),(3,[1,2],[100,150]); +insert into test.nested_map (k, `SomeMap.ID`, `SomeMap.Num`) values (0,[2],[150]),(1,[1],[150]),(2,[1,2],[150,150]),(3,[1],[-100]); +optimize table test.nested_map; +select `SomeMap.ID`, `SomeMap.Num` from test.nested_map; + +drop table test.nested_map;