diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index ca6aa414bc9..1ea12955409 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1034,12 +1034,6 @@ public: } }; - struct LazyOutput - { - PaddedPODArray blocks; - PaddedPODArray row_nums; - }; - AddedColumns( const Block & left_block, const Block & block_with_columns_to_add, @@ -1055,12 +1049,9 @@ public: size_t num_columns_to_add = block_with_columns_to_add.columns(); if (is_asof_join) ++num_columns_to_add; - has_columns_to_add = num_columns_to_add > 0; columns.reserve(num_columns_to_add); type_name.reserve(num_columns_to_add); right_indexes.reserve(num_columns_to_add); - lazy_output.blocks.reserve(rows_to_add); - lazy_output.row_nums.reserve(rows_to_add); for (const auto & src_column : block_with_columns_to_add) { @@ -1095,9 +1086,162 @@ public: } } + virtual ~AddedColumns() { } + size_t size() const { return columns.size(); } - void buildOutput() + virtual void buildOutput() + { + } + + ColumnWithTypeAndName moveColumn(size_t i) + { + return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name); + } + + virtual void appendFromBlock(const Block & block, size_t row_num, bool has_defaults) + { + if (has_defaults) + applyLazyDefaults(); + +#ifndef NDEBUG + for (size_t j = 0; j < right_indexes.size(); ++j) + { + const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get(); + const auto * dest_column = columns[j].get(); + if (auto * nullable_col = nullable_column_ptrs[j]) + { + if (!is_join_get) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Columns {} and {} can have different nullability only in joinGetOrNull", + dest_column->getName(), column_from_block->getName()); + dest_column = nullable_col->getNestedColumnPtr().get(); + } + /** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns, + * because dictionaries can be different, while calling insertFrom on them is safe, for example: + * ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1))) + * and + * ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1))) + */ + if (typeid(*dest_column) != typeid(*column_from_block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}", + dest_column->getName(), column_from_block->getName(), + demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name())); + } +#endif + if (is_join_get) + { + size_t right_indexes_size = right_indexes.size(); + for (size_t j = 0; j < right_indexes_size; ++j) + { + const auto & column_from_block = block.getByPosition(right_indexes[j]); + if (auto * nullable_col = nullable_column_ptrs[j]) + nullable_col->insertFromNotNullable(*column_from_block.column, row_num); + else + columns[j]->insertFrom(*column_from_block.column, row_num); + } + } + else + { + size_t right_indexes_size = right_indexes.size(); + for (size_t j = 0; j < right_indexes_size; ++j) + { + const auto & column_from_block = block.getByPosition(right_indexes[j]); + columns[j]->insertFrom(*column_from_block.column, row_num); + } + } + } + + virtual void appendDefaultRow() + { + ++lazy_defaults_count; + } + + virtual void applyLazyDefaults() + { + if (lazy_defaults_count) + { + for (size_t j = 0, size = right_indexes.size(); j < size; ++j) + JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count); + lazy_defaults_count = 0; + } + } + + const IColumn & leftAsofKey() const { return *left_asof_key; } + + std::vector join_on_keys; + + size_t max_joined_block_rows = 0; + size_t rows_to_add; + std::unique_ptr offsets_to_replicate; + bool need_filter = false; + IColumn::Filter filter; + + void reserve(bool need_replicate) + { + if (!max_joined_block_rows) + return; + + /// Do not allow big allocations when user set max_joined_block_rows to huge value + size_t reserve_size = std::min(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2); + + if (need_replicate) + /// Reserve 10% more space for columns, because some rows can be repeated + reserve_size = static_cast(1.1 * reserve_size); + + for (auto & column : columns) + column->reserve(reserve_size); + } + +protected: + MutableColumns columns; + bool is_join_get; + std::vector right_indexes; + std::vector type_name; + std::vector nullable_column_ptrs; +private: + + + + + size_t lazy_defaults_count = 0; + /// for ASOF + const IColumn * left_asof_key = nullptr; + + + void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name) + { + columns.push_back(src_column.column->cloneEmpty()); + columns.back()->reserve(src_column.column->size()); + type_name.emplace_back(src_column.type, src_column.name, qualified_name); + } +}; + +class LazyAddedColumns : public AddedColumns +{ +public: + struct LazyOutput + { + PaddedPODArray blocks; + PaddedPODArray row_nums; + }; + + LazyAddedColumns( + const Block & left_block, + const Block & block_with_columns_to_add, + const Block & saved_block_sample, + const HashJoin & join, + std::vector && join_on_keys_, + bool is_asof_join, + bool is_join_get_) + : AddedColumns(left_block, block_with_columns_to_add, saved_block_sample, join, std::move(join_on_keys_), is_asof_join, is_join_get_) + { + has_columns_to_add = block_with_columns_to_add.columns() > 0; + lazy_output.blocks.reserve(rows_to_add); + lazy_output.row_nums.reserve(rows_to_add); + } + + virtual void buildOutput() override { for (size_t i = 0; i < this->size(); ++i) { @@ -1137,13 +1281,7 @@ public: } } - ColumnWithTypeAndName moveColumn(size_t i) - { - return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name); - } - - - void appendFromBlock(const Block & block, size_t row_num) + virtual void appendFromBlock(const Block & block, size_t row_num, bool) override { #ifndef NDEBUG for (size_t j = 0; j < right_indexes.size(); ++j) @@ -1154,8 +1292,8 @@ public: { if (!is_join_get) throw Exception(ErrorCodes::LOGICAL_ERROR, - "Columns {} and {} can have different nullability only in joinGetOrNull", - dest_column->getName(), column_from_block->getName()); + "Columns {} and {} can have different nullability only in joinGetOrNull", + dest_column->getName(), column_from_block->getName()); dest_column = nullable_col->getNestedColumnPtr().get(); } /** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns, @@ -1166,8 +1304,8 @@ public: */ if (typeid(*dest_column) != typeid(*column_from_block)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}", - dest_column->getName(), column_from_block->getName(), - demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name())); + dest_column->getName(), column_from_block->getName(), + demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name())); } #endif if (has_columns_to_add) @@ -1177,7 +1315,7 @@ public: } } - void appendDefaultRow() + virtual void appendDefaultRow() override { if (has_columns_to_add) { @@ -1186,54 +1324,14 @@ public: } } - const IColumn & leftAsofKey() const { return *left_asof_key; } - - std::vector join_on_keys; + virtual void applyLazyDefaults() override { } +private : // The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially, // default_count cannot represent the position of the row LazyOutput lazy_output; - - size_t max_joined_block_rows = 0; - size_t rows_to_add; - std::unique_ptr offsets_to_replicate; - bool need_filter = false; - IColumn::Filter filter; - - void reserve(bool need_replicate) - { - if (!max_joined_block_rows) - return; - - /// Do not allow big allocations when user set max_joined_block_rows to huge value - size_t reserve_size = std::min(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2); - - if (need_replicate) - /// Reserve 10% more space for columns, because some rows can be repeated - reserve_size = static_cast(1.1 * reserve_size); - - for (auto & column : columns) - column->reserve(reserve_size); - } - -private: - std::vector type_name; - MutableColumns columns; - std::vector nullable_column_ptrs; - - std::vector right_indexes; bool has_columns_to_add; - /// for ASOF - const IColumn * left_asof_key = nullptr; - bool is_join_get; - - void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name) - { - columns.push_back(src_column.column->cloneEmpty()); - columns.back()->reserve(src_column.column->size()); - type_name.emplace_back(src_column.type, src_column.name, qualified_name); - } }; template @@ -1333,7 +1431,7 @@ public: } }; -template +template void addFoundRowAll( const typename Map::mapped_type & mapped, AddedColumns & added, @@ -1341,6 +1439,9 @@ void addFoundRowAll( KnownRowsHolder & known_rows [[maybe_unused]], JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]]) { + if constexpr (add_missing) + added.applyLazyDefaults(); + if constexpr (multiple_disjuncts) { std::unique_ptr::Type>> new_known_rows_ptr; @@ -1349,7 +1450,7 @@ void addFoundRowAll( { if (!known_rows.isKnown(std::make_pair(it->block, it->row_num))) { - added.appendFromBlock(*it->block, it->row_num); + added.appendFromBlock(*it->block, it->row_num, false); ++current_offset; if (!new_known_rows_ptr) { @@ -1373,7 +1474,7 @@ void addFoundRowAll( { for (auto it = mapped.begin(); it.ok(); ++it) { - added.appendFromBlock(*it->block, it->row_num); + added.appendFromBlock(*it->block, it->row_num, false); ++current_offset; } } @@ -1462,7 +1563,7 @@ NO_INLINE size_t joinRightColumns( else used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*row_ref.block, row_ref.row_num); + added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing); } else addNotFoundRow(added_columns, current_offset); @@ -1472,7 +1573,7 @@ NO_INLINE size_t joinRightColumns( setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; - addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); + addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); } else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right) { @@ -1482,7 +1583,7 @@ NO_INLINE size_t joinRightColumns( { auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; setUsed(added_columns.filter, i); - addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); + addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); } } else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner) @@ -1493,7 +1594,7 @@ NO_INLINE size_t joinRightColumns( if (used_once) { setUsed(added_columns.filter, i); - added_columns.appendFromBlock(*mapped.block, mapped.row_num); + added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing); } break; @@ -1511,7 +1612,7 @@ NO_INLINE size_t joinRightColumns( { setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*mapped.block, mapped.row_num); + added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing); if (join_features.is_any_or_semi_join) { @@ -1701,14 +1802,19 @@ Block HashJoin::joinBlockImpl( * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. * For ASOF, the last column is used as the ASOF column */ - AddedColumns added_columns( - block, - block_with_columns_to_add, - savedBlockSample(), - *this, - std::move(join_on_keys), - join_features.is_asof_join, - is_join_get); + std::unique_ptr added_columns_ptr; + if (!join_features.is_any_join) + { + added_columns_ptr = std::make_unique( + block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get); + } + else + { + added_columns_ptr = std::make_unique( + block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get); + } + + AddedColumns & added_columns = * added_columns_ptr; bool has_required_right_keys = (required_right_keys.columns() != 0); added_columns.need_filter = join_features.need_filter || has_required_right_keys;