diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 015567a4e5b..cd246876fcc 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -325,6 +325,7 @@ class IColumn; \ M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \ \ + M(Int32, join_output_by_rowlist_perkey_rows_threshold, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join.", 0) \ M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \ M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \ M(Bool, single_join_prefer_left_table, true, "For single JOIN in case of identifier ambiguity prefer left table", IMPORTANT) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 4603bab942e..0bc79d6ff57 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -88,6 +88,7 @@ static std::initializer_list void AddedColumns::buildOutput() -{ -} +template<> +void AddedColumns::buildOutput() {} + +template<> +void AddedColumns::buildJoinGetOutput() {} + +template<> +template +void AddedColumns::buildOutputFromBlocks() {} template<> void AddedColumns::buildOutput() { - for (size_t i = 0; i < this->size(); ++i) + if (!output_by_row_list) + buildOutputFromBlocks(); + else { - auto& col = columns[i]; - size_t default_count = 0; - auto apply_default = [&]() + if (join_data_avg_perkey_rows < output_by_row_list_threshold) + buildOutputFromBlocks(); + else { - if (default_count > 0) + for (size_t i = 0; i < this->size(); ++i) { - JoinCommon::addDefaultValues(*col, type_name[i].type, default_count); - default_count = 0; - } - }; - - for (size_t j = 0; j < lazy_output.blocks.size(); ++j) - { - if (!lazy_output.blocks[j]) - { - default_count++; - continue; - } - apply_default(); - const auto & column_from_block = reinterpret_cast(lazy_output.blocks[j])->getByPosition(right_indexes[i]); - /// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin. - if (is_join_get) - { - if (auto * nullable_col = typeid_cast(col.get()); - nullable_col && !column_from_block.column->isNullable()) + auto & col = columns[i]; + for (auto row_ref_i : lazy_output.row_refs) { - nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]); - continue; + if (row_ref_i) + { + const RowRefList * row_ref_list = reinterpret_cast(row_ref_i); + for (auto it = row_ref_list->begin(); it.ok(); ++it) + col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num); + } + else + type_name[i].type->insertDefaultInto(*col); } } - col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]); } - apply_default(); + } +} + +template<> +void AddedColumns::buildJoinGetOutput() +{ + for (size_t i = 0; i < this->size(); ++i) + { + auto & col = columns[i]; + for (auto row_ref_i : lazy_output.row_refs) + { + if (!row_ref_i) + { + type_name[i].type->insertDefaultInto(*col); + continue; + } + const auto * row_ref = reinterpret_cast(row_ref_i); + const auto & column_from_block = row_ref->block->getByPosition(right_indexes[i]); + if (auto * nullable_col = typeid_cast(col.get()); nullable_col && !column_from_block.column->isNullable()) + nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num); + else + col->insertFrom(*column_from_block.column, row_ref->row_num); + } + } +} + +template<> +template +void AddedColumns::buildOutputFromBlocks() +{ + if (this->size() == 0) + return; + std::vector blocks; + std::vector row_nums; + blocks.reserve(lazy_output.row_refs.size()); + row_nums.reserve(lazy_output.row_refs.size()); + for (auto row_ref_i : lazy_output.row_refs) + { + if (row_ref_i) + { + if constexpr (from_row_list) + { + const RowRefList * row_ref_list = reinterpret_cast(row_ref_i); + for (auto it = row_ref_list->begin(); it.ok(); ++it) + { + blocks.emplace_back(it->block); + row_nums.emplace_back(it->row_num); + } + } + else + { + const RowRef * row_ref = reinterpret_cast(row_ref_i); + blocks.emplace_back(row_ref->block); + row_nums.emplace_back(row_ref->row_num); + } + } + else + { + blocks.emplace_back(nullptr); + row_nums.emplace_back(0); + } + } + for (size_t i = 0; i < this->size(); ++i) + { + auto & col = columns[i]; + for (size_t j = 0; j < blocks.size(); ++j) + { + if (blocks[j]) + col->insertFrom(*blocks[j]->getByPosition(right_indexes[i]).column, row_nums[j]); + else + type_name[i].type->insertDefaultInto(*col); + } } } @@ -72,29 +139,27 @@ void AddedColumns::applyLazyDefaults() } template<> -void AddedColumns::applyLazyDefaults() -{ -} +void AddedColumns::applyLazyDefaults() {} template <> -void AddedColumns::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults) +void AddedColumns::appendFromBlock(const RowRef * row_ref, const bool has_defaults) { if (has_defaults) applyLazyDefaults(); #ifndef NDEBUG - checkBlock(block); + checkBlock(*row_ref->block); #endif if (is_join_get) { size_t right_indexes_size = right_indexes.size(); for (size_t j = 0; j < right_indexes_size; ++j) { - const auto & column_from_block = block.getByPosition(right_indexes[j]); + const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]); if (auto * nullable_col = nullable_column_ptrs[j]) - nullable_col->insertFromNotNullable(*column_from_block.column, row_num); + nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num); else - columns[j]->insertFrom(*column_from_block.column, row_num); + columns[j]->insertFrom(*column_from_block.column, row_ref->row_num); } } else @@ -102,22 +167,21 @@ void AddedColumns::appendFromBlock(const Block & block, size_t row_num,co size_t right_indexes_size = right_indexes.size(); for (size_t j = 0; j < right_indexes_size; ++j) { - const auto & column_from_block = block.getByPosition(right_indexes[j]); - columns[j]->insertFrom(*column_from_block.column, row_num); + const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]); + columns[j]->insertFrom(*column_from_block.column, row_ref->row_num); } } } template <> -void AddedColumns::appendFromBlock(const Block & block, size_t row_num, bool) +void AddedColumns::appendFromBlock(const RowRef * row_ref, bool) { #ifndef NDEBUG - checkBlock(block); + checkBlock(*row_ref->block); #endif if (has_columns_to_add) { - lazy_output.blocks.emplace_back(reinterpret_cast(&block)); - lazy_output.row_nums.emplace_back(static_cast(row_num)); + lazy_output.row_refs.emplace_back(reinterpret_cast(row_ref)); } } template<> @@ -131,8 +195,7 @@ void AddedColumns::appendDefaultRow() { if (has_columns_to_add) { - lazy_output.blocks.emplace_back(0); - lazy_output.row_nums.emplace_back(0); + lazy_output.row_refs.emplace_back(0); } } } diff --git a/src/Interpreters/HashJoin/AddedColumns.h b/src/Interpreters/HashJoin/AddedColumns.h index 13a7df6f498..f1b95a63be6 100644 --- a/src/Interpreters/HashJoin/AddedColumns.h +++ b/src/Interpreters/HashJoin/AddedColumns.h @@ -50,8 +50,7 @@ public: struct LazyOutput { - PaddedPODArray blocks; - PaddedPODArray row_nums; + PaddedPODArray row_refs; }; AddedColumns( @@ -76,8 +75,7 @@ public: if constexpr (lazy) { has_columns_to_add = num_columns_to_add > 0; - lazy_output.blocks.reserve(rows_to_add); - lazy_output.row_nums.reserve(rows_to_add); + lazy_output.row_refs.reserve(rows_to_add); } columns.reserve(num_columns_to_add); @@ -115,18 +113,22 @@ public: if (columns[j]->isNullable() && !saved_column->isNullable()) nullable_column_ptrs[j] = typeid_cast(columns[j].get()); } + join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows(); + output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold(); } size_t size() const { return columns.size(); } void buildOutput(); + void buildJoinGetOutput(); + ColumnWithTypeAndName moveColumn(size_t i) { return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name); } - void appendFromBlock(const Block & block, size_t row_num, bool has_default); + void appendFromBlock(const RowRef * row_ref, bool has_default); void appendDefaultRow(); @@ -134,6 +136,8 @@ public: const IColumn & leftAsofKey() const { return *left_asof_key; } + static constexpr bool isLazy() { return lazy; } + Block left_block; std::vector join_on_keys; ExpressionActionsPtr additional_filter_expression; @@ -142,6 +146,9 @@ public: size_t rows_to_add; std::unique_ptr offsets_to_replicate; bool need_filter = false; + bool output_by_row_list = false; + size_t join_data_avg_perkey_rows = 0; + size_t output_by_row_list_threshold = 0; IColumn::Filter filter; void reserve(bool need_replicate) @@ -212,15 +219,22 @@ private: columns.back()->reserve(src_column.column->size()); type_name.emplace_back(src_column.type, src_column.name, qualified_name); } + + /** Build output from the blocks that extract from `RowRef` or `RowRefList`, to avoid block cache miss which may cause performance slow down. + * And This problem would happen it we directly build output from `RowRef` or `RowRefList`. + */ + template + void buildOutputFromBlocks(); }; /// Adapter class to pass into addFoundRowAll /// In joinRightColumnsWithAdditionalFilter we don't want to add rows directly into AddedColumns, /// because they need to be filtered by additional_filter_expression. -class PreSelectedRows : public std::vector +class PreSelectedRows : public std::vector { public: - void appendFromBlock(const Block & block, size_t row_num, bool /* has_default */) { this->emplace_back(&block, row_num); } + void appendFromBlock(const RowRef * row_ref, bool /* has_default */) { this->emplace_back(row_ref); } + static constexpr bool isLazy() { return false; } }; } diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index dd7d42de63e..9c07a71e614 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -495,7 +495,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits) } size_t rows = source_block.rows(); - + data->rows_to_join += rows; const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right); ColumnPtrMap all_key_columns(right_key_names.size()); for (const auto & column_name : right_key_names) @@ -647,7 +647,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits) total_bytes = getTotalByteCount(); } } - + data->keys_to_join = total_rows; shrinkStoredBlocksToFit(total_bytes); return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h index 00f5ef6d214..d645b8e9273 100644 --- a/src/Interpreters/HashJoin/HashJoin.h +++ b/src/Interpreters/HashJoin/HashJoin.h @@ -345,6 +345,18 @@ public: size_t blocks_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0; + + /// Number of rows of right table to join + size_t rows_to_join = 0; + /// Number of keys of right table to join + size_t keys_to_join = 0; + + size_t avgPerKeyRows() const + { + if (keys_to_join == 0) + return 0; + return rows_to_join / keys_to_join; + } }; using RightTableDataPtr = std::shared_ptr; diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index 3b7a67467e3..97ad57d26ea 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -83,6 +83,7 @@ public: const Block & block_with_columns_to_add, const MapsTemplateVector & maps_, bool is_join_get = false); + private: template static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes); @@ -128,7 +129,7 @@ private: template static ColumnPtr buildAdditionalFilter( size_t left_start_row, - const std::vector & selected_rows, + const std::vector & selected_rows, const std::vector & row_replicate_offset, AddedColumns & added_columns); diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h index 39ba9fc6e93..320c8851ce4 100644 --- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h +++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h @@ -95,7 +95,10 @@ Block HashJoinMethods::joinBlockImpl( added_columns.join_on_keys.clear(); Block remaining_block = sliceBlock(block, num_joined); - added_columns.buildOutput(); + if (is_join_get) + added_columns.buildJoinGetOutput(); + else + added_columns.buildOutput(); for (size_t i = 0; i < added_columns.size(); ++i) block.insert(added_columns.moveColumn(i)); @@ -339,6 +342,8 @@ size_t HashJoinMethods::joinRightColumns( size_t rows = added_columns.rows_to_add; if constexpr (need_filter) added_columns.filter = IColumn::Filter(rows, 0); + if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right))) + added_columns.output_by_row_list = true; Arena pool; @@ -381,15 +386,15 @@ size_t HashJoinMethods::joinRightColumns( const IColumn & left_asof_key = added_columns.leftAsofKey(); auto row_ref = mapped->findAsof(left_asof_key, i); - if (row_ref.block) + if (row_ref && row_ref->block) { setUsed(added_columns.filter, i); if constexpr (flag_per_row) - used_flags.template setUsed(row_ref.block, row_ref.row_num, 0); + used_flags.template setUsed(row_ref->block, row_ref->row_num, 0); else used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing); + added_columns.appendFromBlock(row_ref, join_features.add_missing); } else addNotFoundRow(added_columns, current_offset); @@ -420,7 +425,7 @@ size_t HashJoinMethods::joinRightColumns( if (used_once) { setUsed(added_columns.filter, i); - added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing); + added_columns.appendFromBlock(&mapped, join_features.add_missing); } break; @@ -438,7 +443,7 @@ size_t HashJoinMethods::joinRightColumns( { setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing); + added_columns.appendFromBlock(&mapped, join_features.add_missing); if (join_features.is_any_or_semi_join) { @@ -477,7 +482,7 @@ template template ColumnPtr HashJoinMethods::buildAdditionalFilter( size_t left_start_row, - const std::vector & selected_rows, + const std::vector & selected_rows, const std::vector & row_replicate_offset, AddedColumns & added_columns) { @@ -489,7 +494,7 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter result_column = ColumnUInt8::create(); break; } - const Block & sample_right_block = *selected_rows.begin()->block; + const Block & sample_right_block = *((*selected_rows.begin())->block); if (!sample_right_block || !added_columns.additional_filter_expression) { auto filter = ColumnUInt8::create(); @@ -519,8 +524,8 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter auto new_col = col.column->cloneEmpty(); for (const auto & selected_row : selected_rows) { - const auto & src_col = selected_row.block->getByPosition(right_col_pos); - new_col->insertFrom(*src_col.column, selected_row.row_num); + const auto & src_col = selected_row->block->getByPosition(right_col_pos); + new_col->insertFrom(*src_col.column, selected_row->row_num); } executed_block.insert({std::move(new_col), col.type, col.name}); } @@ -700,26 +705,24 @@ size_t HashJoinMethods::joinRightColumnsWithAddt { // For inner join, we need mark each right row'flag, because we only use each right row once. auto used_once = used_flags.template setUsedOnce( - selected_right_row_it->block, selected_right_row_it->row_num, 0); + (*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); if (used_once) { any_matched = true; total_added_rows += 1; - added_columns.appendFromBlock( - *selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing); + added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); } } } else { auto used_once = used_flags.template setUsedOnce( - selected_right_row_it->block, selected_right_row_it->row_num, 0); + (*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); if (used_once) { any_matched = true; total_added_rows += 1; - added_columns.appendFromBlock( - *selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing); + added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); } } } @@ -727,16 +730,14 @@ size_t HashJoinMethods::joinRightColumnsWithAddt { any_matched = true; if constexpr (join_features.right && join_features.need_flags) - used_flags.template setUsed(selected_right_row_it->block, selected_right_row_it->row_num, 0); + used_flags.template setUsed((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); } else { any_matched = true; total_added_rows += 1; - added_columns.appendFromBlock( - *selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing); - used_flags.template setUsed( - selected_right_row_it->block, selected_right_row_it->row_num, 0); + added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); + used_flags.template setUsed((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); } } @@ -756,8 +757,7 @@ size_t HashJoinMethods::joinRightColumnsWithAddt if (filter_flags[replicated_row]) { any_matched = true; - added_columns.appendFromBlock( - *selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing); + added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); total_added_rows += 1; } ++selected_right_row_it; @@ -767,8 +767,7 @@ size_t HashJoinMethods::joinRightColumnsWithAddt if (filter_flags[replicated_row]) { any_matched = true; - added_columns.appendFromBlock( - *selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing); + added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); total_added_rows += 1; selected_right_row_it = selected_right_row_it + row_replicate_offset[i] - replicated_row; break; diff --git a/src/Interpreters/HashJoin/KnowRowsHolder.h b/src/Interpreters/HashJoin/KnowRowsHolder.h index d51c96893c5..9223e98d13c 100644 --- a/src/Interpreters/HashJoin/KnowRowsHolder.h +++ b/src/Interpreters/HashJoin/KnowRowsHolder.h @@ -104,7 +104,7 @@ void addFoundRowAll( { if (!known_rows.isKnown(std::make_pair(it->block, it->row_num))) { - added.appendFromBlock(*it->block, it->row_num, false); + added.appendFromBlock(*it, false); ++current_offset; if (!new_known_rows_ptr) { @@ -124,11 +124,16 @@ void addFoundRowAll( known_rows.add(std::cbegin(*new_known_rows_ptr), std::cend(*new_known_rows_ptr)); } } + else if constexpr (AddedColumns::isLazy()) + { + added.appendFromBlock(&mapped, false); + current_offset += mapped.rows; + } else { for (auto it = mapped.begin(); it.ok(); ++it) { - added.appendFromBlock(*it->block, it->row_num, false); + added.appendFromBlock(*it, false); ++current_offset; } } diff --git a/src/Interpreters/RowRefs.cpp b/src/Interpreters/RowRefs.cpp index 9785ba46dab..1b397ab56ef 100644 --- a/src/Interpreters/RowRefs.cpp +++ b/src/Interpreters/RowRefs.cpp @@ -144,7 +144,7 @@ public: return low; } - RowRef findAsof(const IColumn & asof_column, size_t row_num) override + RowRef * findAsof(const IColumn & asof_column, size_t row_num) override { sort(); @@ -156,10 +156,10 @@ public: if (pos != entries.size()) { size_t row_ref_index = entries[pos].row_ref_index; - return row_refs[row_ref_index]; + return &row_refs[row_ref_index]; } - return {nullptr, 0}; + return nullptr; } private: diff --git a/src/Interpreters/RowRefs.h b/src/Interpreters/RowRefs.h index 650b2311ba7..7c98c47dd11 100644 --- a/src/Interpreters/RowRefs.h +++ b/src/Interpreters/RowRefs.h @@ -122,7 +122,7 @@ struct RowRefList : RowRef }; RowRefList() {} /// NOLINT - RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} + RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {} ForwardIterator begin() const { return ForwardIterator(this); } @@ -135,8 +135,11 @@ struct RowRefList : RowRef *next = Batch(nullptr); } next = next->insert(std::move(row_ref), pool); + ++rows; } +public: + SizeT rows = 0; private: Batch * next = nullptr; }; @@ -158,7 +161,7 @@ struct SortedLookupVectorBase virtual void insert(const IColumn &, const Block *, size_t) = 0; // This needs to be synchronized internally - virtual RowRef findAsof(const IColumn &, size_t) = 0; + virtual RowRef * findAsof(const IColumn &, size_t) = 0; }; diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index c8c926db13c..138085f0710 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -115,6 +115,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, Temporary , partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes) , max_files_to_merge(settings.join_on_disk_max_files_to_merge) , temporary_files_codec(settings.temporary_files_codec) + , output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold) , max_memory_usage(settings.max_memory_usage) , tmp_volume(tmp_volume_) , tmp_data(tmp_data_) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 3f2bebb5816..4d626084d81 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -148,6 +148,7 @@ private: const size_t partial_merge_join_left_table_buffer_bytes = 0; const size_t max_files_to_merge = 0; const String temporary_files_codec = "LZ4"; + const size_t output_by_rowlist_perkey_rows_threshold = 0; /// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified. size_t max_memory_usage = 0; @@ -295,6 +296,7 @@ public: return join_use_nulls && isRightOrFull(kind()); } + size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; } size_t defaultMaxBytes() const { return default_max_bytes; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index ed91913de4d..fb56fdd4fe0 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -133,16 +133,31 @@ static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr buffer = chunk.value_data(); const size_t chunk_length = chunk.length(); - for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + const size_t null_count = chunk.null_count(); + if (null_count == 0) { - if (!chunk.IsNull(offset_i) && buffer) + for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) { const auto * raw_data = buffer->data() + chunk.value_offset(offset_i); column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); - } - column_chars_t.emplace_back('\0'); + column_chars_t.emplace_back('\0'); - column_offsets.emplace_back(column_chars_t.size()); + column_offsets.emplace_back(column_chars_t.size()); + } + } + else + { + for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + { + if (!chunk.IsNull(offset_i) && buffer) + { + const auto * raw_data = buffer->data() + chunk.value_offset(offset_i); + column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); + } + column_chars_t.emplace_back('\0'); + + column_offsets.emplace_back(column_chars_t.size()); + } } } return {std::move(internal_column), std::move(internal_type), column_name}; diff --git a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp index 58bec8120f1..e68286bfcc5 100644 --- a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp @@ -1143,24 +1143,42 @@ readColumnWithStringData(const orc::ColumnVectorBatch * orc_column, const orc::T reserver_size += 1; } - column_chars_t.reserve(reserver_size); - column_offsets.reserve(orc_str_column->numElements); + column_chars_t.resize_exact(reserver_size); + column_offsets.resize_exact(orc_str_column->numElements); size_t curr_offset = 0; - for (size_t i = 0; i < orc_str_column->numElements; ++i) + if (!orc_str_column->hasNulls) { - if (!orc_str_column->hasNulls || orc_str_column->notNull[i]) + for (size_t i = 0; i < orc_str_column->numElements; ++i) { const auto * buf = orc_str_column->data[i]; size_t buf_size = orc_str_column->length[i]; - column_chars_t.insert_assume_reserved(buf, buf + buf_size); + memcpy(&column_chars_t[curr_offset], buf, buf_size); curr_offset += buf_size; + + column_chars_t[curr_offset] = 0; + ++curr_offset; + + column_offsets[i] = curr_offset; } + } + else + { + for (size_t i = 0; i < orc_str_column->numElements; ++i) + { + if (orc_str_column->notNull[i]) + { + const auto * buf = orc_str_column->data[i]; + size_t buf_size = orc_str_column->length[i]; + memcpy(&column_chars_t[curr_offset], buf, buf_size); + curr_offset += buf_size; + } - column_chars_t.push_back(0); - ++curr_offset; + column_chars_t[curr_offset] = 0; + ++curr_offset; - column_offsets.push_back(curr_offset); + column_offsets[i] = curr_offset; + } } return {std::move(internal_column), std::move(internal_type), column_name}; } diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index 5948954ff5f..c724c5bb498 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -445,7 +445,7 @@ def test_mysql_distributed(started_cluster): query = "SELECT * FROM (" for i in range(3): query += "SELECT name FROM test_replicas UNION DISTINCT " - query += "SELECT name FROM test_replicas)" + query += "SELECT name FROM test_replicas) ORDER BY name" result = node2.query(query) assert result == "host2\nhost3\nhost4\n" @@ -827,6 +827,9 @@ def test_settings(started_cluster): f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" ) + node1.query("DROP DATABASE IF EXISTS m") + node1.query("DROP DATABASE IF EXISTS mm") + rw_timeout = 40123001 connect_timeout = 40123002 node1.query( @@ -855,6 +858,9 @@ def test_settings(started_cluster): f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" ) + node1.query("DROP DATABASE m") + node1.query("DROP DATABASE mm") + drop_mysql_table(conn, table_name) conn.close() @@ -930,6 +936,9 @@ def test_joins(started_cluster): conn.commit() + node1.query("DROP TABLE IF EXISTS test_joins_table_users") + node1.query("DROP TABLE IF EXISTS test_joins_table_tickets") + node1.query( """ CREATE TABLE test_joins_table_users @@ -964,6 +973,9 @@ def test_joins(started_cluster): """ ) == "281607\tFeedback\t2024-06-25 12:09:41\tuser@example.com\n" + node1.query("DROP TABLE test_joins_table_users") + node1.query("DROP TABLE test_joins_table_tickets") + if __name__ == "__main__": with contextmanager(started_cluster)() as cluster: diff --git a/tests/performance/all_join_opt.xml b/tests/performance/all_join_opt.xml new file mode 100644 index 00000000000..0ab9c39f67c --- /dev/null +++ b/tests/performance/all_join_opt.xml @@ -0,0 +1,15 @@ + + CREATE TABLE test (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a + CREATE TABLE test1 (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a + + INSERT INTO test SELECT number % 10000, number % 10000, number % 10000 FROM numbers(10000000) + INSERT INTO test1 SELECT number % 1000 , number % 1000, number % 1000 FROM numbers(100000) + + SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b + SELECT MAX(test1.a) FROM test LEFT JOIN test1 on test.b = test1.b + SELECT MAX(test1.a) FROM test RIGHT JOIN test1 on test.b = test1.b + SELECT MAX(test1.a) FROM test FULL JOIN test1 on test.b = test1.b + + DROP TABLE IF EXISTS test + DROP TABLE IF EXISTS test1 + \ No newline at end of file