diff --git a/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp b/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp index 13a61d26caa..9fcb7cea116 100644 --- a/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/CollapsingSortedTransform.cpp @@ -25,7 +25,7 @@ CollapsingSortedTransform::CollapsingSortedTransform( WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes) : IMergingTransform(num_inputs, header, header, true) - , merged_data(header, use_average_block_sizes, max_block_size) + , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , sign_column_number(header.getPositionByName(sign_column)) , out_row_sources_buf(out_row_sources_buf_) diff --git a/dbms/src/Processors/Merges/MergedData.h b/dbms/src/Processors/Merges/MergedData.h index e5a8a541aa5..37dd3c62587 100644 --- a/dbms/src/Processors/Merges/MergedData.h +++ b/dbms/src/Processors/Merges/MergedData.h @@ -13,12 +13,9 @@ namespace ErrorCodes class MergedData { public: - explicit MergedData(const Block & header, bool use_average_block_size_, UInt64 max_block_size_) - : max_block_size(max_block_size_), use_average_block_size(use_average_block_size_) + explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_) + : columns(std::move(columns_)), max_block_size(max_block_size_), use_average_block_size(use_average_block_size_) { - columns.reserve(header.columns()); - for (const auto & column : header) - columns.emplace_back(column.type->createColumn()); } /// Pull will be called at next prepare call. diff --git a/dbms/src/Processors/Merges/MergingSortedTransform.cpp b/dbms/src/Processors/Merges/MergingSortedTransform.cpp index 226b29c36dc..7b7e4fcf62c 100644 --- a/dbms/src/Processors/Merges/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/MergingSortedTransform.cpp @@ -24,7 +24,7 @@ MergingSortedTransform::MergingSortedTransform( bool use_average_block_sizes, bool have_all_inputs_) : IMergingTransform(num_inputs, header, header, have_all_inputs_) - , merged_data(header, use_average_block_sizes, max_block_size) + , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , limit(limit_) , quiet(quiet_) diff --git a/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp b/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp index 2de67707d6d..65654a98764 100644 --- a/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/ReplacingSortedTransform.cpp @@ -16,7 +16,7 @@ ReplacingSortedTransform::ReplacingSortedTransform( WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes) : IMergingTransform(num_inputs, header, header, true) - , merged_data(header, use_average_block_sizes, max_block_size) + , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , out_row_sources_buf(out_row_sources_buf_) , chunk_allocator(num_inputs + max_row_refs) diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.cpp b/dbms/src/Processors/Merges/SummingSortedTransform.cpp index 0c4052e821d..45f3f9b71c9 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.cpp +++ b/dbms/src/Processors/Merges/SummingSortedTransform.cpp @@ -4,261 +4,302 @@ #include #include #include +#include namespace DB { namespace { -bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) -{ - for (auto & desc : description) - if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) - return true; - - return false; -} - -/// Returns true if merge result is not empty -bool mergeMap(const SummingSortedTransform::MapDescription & desc, Row & row, SortCursor & cursor) -{ - /// Strongly non-optimal. - - Row & left = row; - Row right(left.size()); - - for (size_t col_num : desc.key_col_nums) - right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get(); - - for (size_t col_num : desc.val_col_nums) - right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get(); - - auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field & + bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) { - return matrix[i].get()[j]; - }; + for (auto & desc : description) + if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) + return true; - auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array - { - size_t size = col_nums.size(); - Array res(size); - for (size_t col_num_index = 0; col_num_index < size; ++col_num_index) - res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j); - return res; - }; - - std::map merged; - - auto accumulate = [](Array & dst, const Array & src) - { - bool has_non_zero = false; - size_t size = dst.size(); - for (size_t i = 0; i < size; ++i) - if (applyVisitor(FieldVisitorSum(src[i]), dst[i])) - has_non_zero = true; - return has_non_zero; - }; - - auto merge = [&](const Row & matrix) - { - size_t rows = matrix[desc.key_col_nums[0]].get().size(); - - for (size_t j = 0; j < rows; ++j) - { - Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j); - Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j); - - auto it = merged.find(key); - if (merged.end() == it) - merged.emplace(std::move(key), std::move(value)); - else - { - if (!accumulate(it->second, value)) - merged.erase(it); - } - } - }; - - merge(left); - merge(right); - - for (size_t col_num : desc.key_col_nums) - row[col_num] = Array(merged.size()); - for (size_t col_num : desc.val_col_nums) - row[col_num] = Array(merged.size()); - - size_t row_num = 0; - for (const auto & key_value : merged) - { - for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index) - row[desc.key_col_nums[col_num_index]].get()[row_num] = key_value.first[col_num_index]; - - for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index) - row[desc.val_col_nums[col_num_index]].get()[row_num] = key_value.second[col_num_index]; - - ++row_num; + return false; } - return row_num != 0; -} + /// Returns true if merge result is not empty + bool mergeMap(const SummingSortedTransform::MapDescription & desc, Row & row, SortCursor & cursor) + { + /// Strongly non-optimal. + + Row & left = row; + Row right(left.size()); + + for (size_t col_num : desc.key_col_nums) + right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get(); + + for (size_t col_num : desc.val_col_nums) + right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get(); + + auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field & + { + return matrix[i].get()[j]; + }; + + auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array + { + size_t size = col_nums.size(); + Array res(size); + for (size_t col_num_index = 0; col_num_index < size; ++col_num_index) + res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j); + return res; + }; + + std::map merged; + + auto accumulate = [](Array & dst, const Array & src) + { + bool has_non_zero = false; + size_t size = dst.size(); + for (size_t i = 0; i < size; ++i) + if (applyVisitor(FieldVisitorSum(src[i]), dst[i])) + has_non_zero = true; + return has_non_zero; + }; + + auto merge = [&](const Row & matrix) + { + size_t rows = matrix[desc.key_col_nums[0]].get().size(); + + for (size_t j = 0; j < rows; ++j) + { + Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j); + Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j); + + auto it = merged.find(key); + if (merged.end() == it) + merged.emplace(std::move(key), std::move(value)); + else + { + if (!accumulate(it->second, value)) + merged.erase(it); + } + } + }; + + merge(left); + merge(right); + + for (size_t col_num : desc.key_col_nums) + row[col_num] = Array(merged.size()); + for (size_t col_num : desc.val_col_nums) + row[col_num] = Array(merged.size()); + + size_t row_num = 0; + for (const auto & key_value : merged) + { + for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index) + row[desc.key_col_nums[col_num_index]].get()[row_num] = key_value.first[col_num_index]; + + for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index) + row[desc.val_col_nums[col_num_index]].get()[row_num] = key_value.second[col_num_index]; + + ++row_num; + } + + return row_num != 0; + } + + SummingSortedTransform::ColumnsDefinition defineColumns( + const Block & header, + const SortDescription & description, + const Names & column_names_to_sum) + { + size_t num_columns = header.columns(); + SummingSortedTransform::ColumnsDefinition def; + + /// name of nested structure -> the column numbers that refer to it. + std::unordered_map> discovered_maps; + + /** Fill in the column numbers, which must be summed. + * This can only be numeric columns that are not part of the sort key. + * If a non-empty column_names_to_sum is specified, then we only take these columns. + * Some columns from column_names_to_sum may not be found. This is ignored. + */ + for (size_t i = 0; i < num_columns; ++i) + { + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// Discover nested Maps and find columns for summation + if (typeid_cast(column.type.get())) + { + const auto map_name = Nested::extractTableName(column.name); + /// if nested table name ends with `Map` it is a possible candidate for special handling + if (map_name == column.name || !endsWith(map_name, "Map")) + { + def.column_numbers_not_to_aggregate.push_back(i); + continue; + } + + discovered_maps[map_name].emplace_back(i); + } + else + { + bool is_agg_func = WhichDataType(column.type).isAggregateFunction(); + + /// There are special const columns for example after prewhere sections. + if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column)) + { + def.column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Are they inside the PK? + if (isInPrimaryKey(description, column.name, i)) + { + def.column_numbers_not_to_aggregate.push_back(i); + continue; + } + + if (column_names_to_sum.empty() + || column_names_to_sum.end() != + std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) + { + // Create aggregator to sum this column + SummingSortedTransform::AggregateDescription desc; + desc.is_agg_func_type = is_agg_func; + desc.column_numbers = {i}; + + if (!is_agg_func) + { + desc.init("sumWithOverflow", {column.type}); + } + + def.columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Column is not going to be summed, use last value + def.column_numbers_not_to_aggregate.push_back(i); + } + } + } + + /// select actual nested Maps from list of candidates + for (const auto & map : discovered_maps) + { + /// map should contain at least two elements (key -> value) + if (map.second.size() < 2) + { + for (auto col : map.second) + def.column_numbers_not_to_aggregate.push_back(col); + continue; + } + + /// no elements of map could be in primary key + auto column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) + break; + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + def.column_numbers_not_to_aggregate.push_back(col); + continue; + } + + DataTypes argument_types; + SummingSortedTransform::AggregateDescription desc; + SummingSortedTransform::MapDescription map_desc; + + column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + { + const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); + const String & name = key_col.name; + const IDataType & nested_type = *assert_cast(*key_col.type).getNestedType(); + + if (column_num_it == map.second.begin() + || endsWith(name, "ID") + || endsWith(name, "Key") + || endsWith(name, "Type")) + { + if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type)) + break; + + map_desc.key_col_nums.push_back(*column_num_it); + } + else + { + if (!nested_type.isSummable()) + break; + + map_desc.val_col_nums.push_back(*column_num_it); + } + + // Add column to function arguments + desc.column_numbers.push_back(*column_num_it); + argument_types.push_back(key_col.type); + } + + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + def.column_numbers_not_to_aggregate.push_back(col); + continue; + } + + if (map_desc.key_col_nums.size() == 1) + { + // Create summation for all value columns in the map + desc.init("sumMapWithOverflow", argument_types); + def.columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Fall back to legacy mergeMaps for composite keys + for (auto col : map.second) + def.column_numbers_not_to_aggregate.push_back(col); + def.maps_to_sum.emplace_back(std::move(map_desc)); + } + } + } + + MutableColumns getMergedDataColumns( + const Block & header, + const SummingSortedTransform::ColumnsDefinition & columns_definition) + { + MutableColumns columns; + columns.reserve(columns_definition.getNumColumns()); + + for (auto & desc : columns_definition.columns_to_aggregate) + { + // Wrap aggregated columns in a tuple to match function signature + if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType())) + { + size_t tuple_size = desc.column_numbers.size(); + MutableColumns tuple_columns(tuple_size); + for (size_t i = 0; i < tuple_size; ++i) + tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column->cloneEmpty(); + + columns.emplace_back(ColumnTuple::create(std::move(tuple_columns))); + } + else + columns.emplace_back(header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty()); + } + + for (auto & column_number : columns_definition.column_numbers_not_to_aggregate) + columns.emplace_back(header.safeGetByPosition(column_number).type->createColumn()); + + return columns; + } } SummingSortedTransform::SummingSortedTransform( size_t num_inputs, const Block & header, - SortDescription description, + SortDescription description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum, size_t max_block_size) : IMergingTransform(num_inputs, header, header, true) + , columns_definition(defineColumns(header, description_, column_names_to_sum)) + , merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size) { size_t num_columns = header.columns(); current_row.resize(num_columns); - - /// name of nested structure -> the column numbers that refer to it. - std::unordered_map> discovered_maps; - - /** Fill in the column numbers, which must be summed. - * This can only be numeric columns that are not part of the sort key. - * If a non-empty column_names_to_sum is specified, then we only take these columns. - * Some columns from column_names_to_sum may not be found. This is ignored. - */ - for (size_t i = 0; i < num_columns; ++i) - { - const ColumnWithTypeAndName & column = header.safeGetByPosition(i); - - /// Discover nested Maps and find columns for summation - if (typeid_cast(column.type.get())) - { - const auto map_name = Nested::extractTableName(column.name); - /// if nested table name ends with `Map` it is a possible candidate for special handling - if (map_name == column.name || !endsWith(map_name, "Map")) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - discovered_maps[map_name].emplace_back(i); - } - else - { - bool is_agg_func = WhichDataType(column.type).isAggregateFunction(); - - /// There are special const columns for example after prewere sections. - if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column)) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - /// Are they inside the PK? - if (isInPrimaryKey(description, column.name, i)) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - if (column_names_to_sum.empty() - || column_names_to_sum.end() != - std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) - { - // Create aggregator to sum this column - AggregateDescription desc; - desc.is_agg_func_type = is_agg_func; - desc.column_numbers = {i}; - - if (!is_agg_func) - { - desc.init("sumWithOverflow", {column.type}); - } - - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Column is not going to be summed, use last value - column_numbers_not_to_aggregate.push_back(i); - } - } - } - - /// select actual nested Maps from list of candidates - for (const auto & map : discovered_maps) - { - /// map should contain at least two elements (key -> value) - if (map.second.size() < 2) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - /// no elements of map could be in primary key - auto column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) - break; - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - DataTypes argument_types; - AggregateDescription desc; - MapDescription map_desc; - - column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - { - const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); - const String & name = key_col.name; - const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); - - if (column_num_it == map.second.begin() - || endsWith(name, "ID") - || endsWith(name, "Key") - || endsWith(name, "Type")) - { - if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type)) - break; - - map_desc.key_col_nums.push_back(*column_num_it); - } - else - { - if (!nested_type.isSummable()) - break; - - map_desc.val_col_nums.push_back(*column_num_it); - } - - // Add column to function arguments - desc.column_numbers.push_back(*column_num_it); - argument_types.push_back(key_col.type); - } - - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - if (map_desc.key_col_nums.size() == 1) - { - // Create summation for all value columns in the map - desc.init("sumMapWithOverflow", argument_types); - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Fall back to legacy mergeMaps for composite keys - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - maps_to_sum.emplace_back(std::move(map_desc)); - } - } } } diff --git a/dbms/src/Processors/Merges/SummingSortedTransform.h b/dbms/src/Processors/Merges/SummingSortedTransform.h index f82c1f9e6db..20b49aa7ac8 100644 --- a/dbms/src/Processors/Merges/SummingSortedTransform.h +++ b/dbms/src/Processors/Merges/SummingSortedTransform.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB { @@ -20,7 +21,7 @@ public: SummingSortedTransform( size_t num_inputs, const Block & header, - SortDescription description, + SortDescription description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum, size_t max_block_size); @@ -80,7 +81,7 @@ public: struct SummingMergedData : public MergedData { public: - + using MergedData::MergedData; }; /// Stores numbers of key-columns and value-columns. @@ -90,12 +91,51 @@ public: std::vector val_col_nums; }; -private: - /// Columns with which values should be summed. - ColumnNumbers column_numbers_not_to_aggregate; + struct ColumnsDefinition + { + /// Columns with which values should be summed. + ColumnNumbers column_numbers_not_to_aggregate; + /// Columns which should be aggregated. + std::vector columns_to_aggregate; + /// Mapping for nested columns. + std::vector maps_to_sum; - std::vector columns_to_aggregate; - std::vector maps_to_sum; + size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); } + }; + +private: + Row current_row; + bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. + + ColumnsDefinition columns_definition; + SummingMergedData merged_data; + + SortDescription description; + + /// Chunks currently being merged. + std::vector source_chunks; + SortCursorImpls cursors; + + /// In merging algorithm, we need to compare current sort key with the last one. + /// So, sorting columns for last row needed to be stored. + /// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor). + Chunk last_chunk; + ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. + + struct RowRef + { + ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns. + UInt64 row_number = 0; + }; + + RowRef last_row; + + SortingHeap queue; + bool is_queue_initialized = false; + + void insertRow(); + void merge(); + void updateCursor(Chunk chunk, size_t source_num); }; } diff --git a/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp b/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp index 9191123c878..4042c146724 100644 --- a/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp +++ b/dbms/src/Processors/Merges/VersionedCollapsingTransform.cpp @@ -14,7 +14,7 @@ VersionedCollapsingTransform::VersionedCollapsingTransform( WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes) : IMergingTransform(num_inputs, header, header, true) - , merged_data(header, use_average_block_sizes, max_block_size) + , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , description(std::move(description_)) , out_row_sources_buf(out_row_sources_buf_) , max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 1) /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer