diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d8837d26e54..c1433ca7250 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -922,6 +922,8 @@ class IColumn; M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ + M(Int32, join_to_sort_perkey_rows_threshold, 40, "The lower limit of per-key average rows in the right table to determine whether to sort it in hash join.", 0) \ + M(Int32, join_to_sort_table_rows_threshold, 10000, "The upper limit of rows in the right table to determine whether to sort it in hash join.", 0) \ M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \ M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 2415323b4a0..b975c6b2fad 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -516,6 +516,259 @@ static std::initializer_list col >= '2023-01-01' AND col <= '2023-12-31')"}, + {"extract_key_value_pairs_max_pairs_per_row", 0, 0, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory."}, + {"default_view_definer", "CURRENT_USER", "CURRENT_USER", "Allows to set default `DEFINER` option while creating a view"}, + {"default_materialized_view_sql_security", "DEFINER", "DEFINER", "Allows to set a default value for SQL SECURITY option when creating a materialized view"}, + {"default_normal_view_sql_security", "INVOKER", "INVOKER", "Allows to set default `SQL SECURITY` option while creating a normal view"}, + {"mysql_map_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."}, + {"mysql_map_fixed_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."}, + }}, + {"24.1", {{"print_pretty_type_names", false, true, "Better user experience."}, + {"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"}, + {"output_format_arrow_use_signed_indexes_for_dictionary", false, true, "Use signed indexes type for Arrow dictionaries by default as it's recommended"}, + {"allow_experimental_variant_type", false, false, "Add new experimental Variant type"}, + {"use_variant_as_common_type", false, false, "Allow to use Variant in if/multiIf if there is no common type"}, + {"output_format_arrow_use_64_bit_indexes_for_dictionary", false, false, "Allow to use 64 bit indexes type in Arrow dictionaries"}, + {"parallel_replicas_mark_segment_size", 128, 128, "Add new setting to control segment size in new parallel replicas coordinator implementation"}, + {"ignore_materialized_views_with_dropped_target_table", false, false, "Add new setting to allow to ignore materialized views with dropped target table"}, + {"output_format_compression_level", 3, 3, "Allow to change compression level in the query output"}, + {"output_format_compression_zstd_window_log", 0, 0, "Allow to change zstd window log in the query output when zstd compression is used"}, + {"enable_zstd_qat_codec", false, false, "Add new ZSTD_QAT codec"}, + {"enable_vertical_final", false, true, "Use vertical final by default"}, + {"output_format_arrow_use_64_bit_indexes_for_dictionary", false, false, "Allow to use 64 bit indexes type in Arrow dictionaries"}, + {"max_rows_in_set_to_optimize_join", 100000, 0, "Disable join optimization as it prevents from read in order optimization"}, + {"output_format_pretty_color", true, "auto", "Setting is changed to allow also for auto value, disabling ANSI escapes if output is not a tty"}, + {"function_visible_width_behavior", 0, 1, "We changed the default behavior of `visibleWidth` to be more precise"}, + {"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"}, + {"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"}, + {"optimize_injective_functions_in_group_by", false, true, "Replace injective functions by it's arguments in GROUP BY section in analyzer"}, + {"update_insert_deduplication_token_in_dependent_materialized_views", false, false, "Allow to update insert deduplication token with table identifier during insert in dependent materialized views"}, + {"azure_max_unexpected_write_error_retries", 4, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write"}, + {"split_parts_ranges_into_intersecting_and_non_intersecting_final", false, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"}, + {"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"}}}, + {"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."}, + {"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"}, + {"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"}, + {"input_format_arrow_allow_missing_columns", false, true, "Allow missing columns in Arrow files by default"}}}, + {"23.11", {{"parsedatetime_parse_without_leading_zeros", false, true, "Improved compatibility with MySQL DATE_FORMAT/STR_TO_DATE"}}}, + {"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"}, + {"input_format_json_try_infer_named_tuples_from_objects", false, true, "Try to infer named Tuples from JSON objects by default"}, + {"input_format_json_read_numbers_as_strings", false, true, "Allow to read numbers as strings in JSON formats by default"}, + {"input_format_json_read_arrays_as_strings", false, true, "Allow to read arrays as strings in JSON formats by default"}, + {"input_format_json_infer_incomplete_types_as_strings", false, true, "Allow to infer incomplete types as Strings in JSON formats by default"}, + {"input_format_json_try_infer_numbers_from_strings", true, false, "Don't infer numbers from strings in JSON formats by default to prevent possible parsing errors"}, + {"http_write_exception_in_output_format", false, true, "Output valid JSON/XML on exception in HTTP streaming."}}}, + {"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}}, + {"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}}, + {"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."}, + {"http_receive_timeout", 180, 30, "See http_send_timeout."}}}, + {"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."}, + {"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."}, + {"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"}, + {"output_format_parquet_compliant_nested_types", false, true, "Change an internal field name in output Parquet file schema."}}}, + {"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"}, + {"allow_nonconst_timezone_arguments", true, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()."}, + {"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"}, + {"connect_timeout_with_failover_secure_ms", 100, 1000, "Increase default secure connect timeout because of async connect"}, + {"hedged_connection_timeout_ms", 100, 50, "Start new connection in hedged requests after 50 ms instead of 100 to correspond with previous connect timeout"}, + {"formatdatetime_f_prints_single_zero", true, false, "Improved compatibility with MySQL DATE_FORMAT()/STR_TO_DATE()"}, + {"formatdatetime_parsedatetime_m_is_month_name", false, true, "Improved compatibility with MySQL DATE_FORMAT/STR_TO_DATE"}}}, + {"23.3", {{"output_format_parquet_version", "1.0", "2.latest", "Use latest Parquet format version for output format"}, + {"input_format_json_ignore_unknown_keys_in_named_tuple", false, true, "Improve parsing JSON objects as named tuples"}, + {"input_format_native_allow_types_conversion", false, true, "Allow types conversion in Native input forma"}, + {"output_format_arrow_compression_method", "none", "lz4_frame", "Use lz4 compression in Arrow output format by default"}, + {"output_format_parquet_compression_method", "snappy", "lz4", "Use lz4 compression in Parquet output format by default"}, + {"output_format_orc_compression_method", "none", "lz4_frame", "Use lz4 compression in ORC output format by default"}, + {"async_query_sending_for_remote", false, true, "Create connections and send query async across shards"}}}, + {"23.2", {{"output_format_parquet_fixed_string_as_fixed_byte_array", false, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type for FixedString by default"}, + {"output_format_arrow_fixed_string_as_fixed_byte_array", false, true, "Use Arrow FIXED_SIZE_BINARY type for FixedString by default"}, + {"query_plan_remove_redundant_distinct", false, true, "Remove redundant Distinct step in query plan"}, + {"optimize_duplicate_order_by_and_distinct", true, false, "Remove duplicate ORDER BY and DISTINCT if it's possible"}, + {"insert_keeper_max_retries", 0, 20, "Enable reconnections to Keeper on INSERT, improve reliability"}}}, + {"23.1", {{"input_format_json_read_objects_as_strings", 0, 1, "Enable reading nested json objects as strings while object type is experimental"}, + {"input_format_json_defaults_for_missing_elements_in_named_tuple", false, true, "Allow missing elements in JSON objects while reading named tuples by default"}, + {"input_format_csv_detect_header", false, true, "Detect header in CSV format by default"}, + {"input_format_tsv_detect_header", false, true, "Detect header in TSV format by default"}, + {"input_format_custom_detect_header", false, true, "Detect header in CustomSeparated format by default"}, + {"query_plan_remove_redundant_sorting", false, true, "Remove redundant sorting in query plan. For example, sorting steps related to ORDER BY clauses in subqueries"}}}, + {"22.12", {{"max_size_to_preallocate_for_aggregation", 10'000'000, 100'000'000, "This optimizes performance"}, + {"query_plan_aggregation_in_order", 0, 1, "Enable some refactoring around query plan"}, + {"format_binary_max_string_size", 0, 1_GiB, "Prevent allocating large amount of memory"}}}, + {"22.11", {{"use_structure_from_insertion_table_in_table_functions", 0, 2, "Improve using structure from insertion table in table functions"}}}, + {"22.9", {{"force_grouping_standard_compatibility", false, true, "Make GROUPING function output the same as in SQL standard and other DBMS"}}}, + {"22.7", {{"cross_to_inner_join_rewrite", 1, 2, "Force rewrite comma join to inner"}, + {"enable_positional_arguments", false, true, "Enable positional arguments feature by default"}, + {"format_csv_allow_single_quotes", true, false, "Most tools don't treat single quote in CSV specially, don't do it by default too"}}}, + {"22.6", {{"output_format_json_named_tuples_as_objects", false, true, "Allow to serialize named tuples as JSON objects in JSON formats by default"}, + {"input_format_skip_unknown_fields", false, true, "Optimize reading subset of columns for some input formats"}}}, + {"22.5", {{"memory_overcommit_ratio_denominator", 0, 1073741824, "Enable memory overcommit feature by default"}, + {"memory_overcommit_ratio_denominator_for_user", 0, 1073741824, "Enable memory overcommit feature by default"}}}, + {"22.4", {{"allow_settings_after_format_in_insert", true, false, "Do not allow SETTINGS after FORMAT for INSERT queries because ClickHouse interpret SETTINGS as some values, which is misleading"}}}, + {"22.3", {{"cast_ipv4_ipv6_default_on_conversion_error", true, false, "Make functions cast(value, 'IPv4') and cast(value, 'IPv6') behave same as toIPv4 and toIPv6 functions"}}}, + {"21.12", {{"stream_like_engine_allow_direct_select", true, false, "Do not allow direct select for Kafka/RabbitMQ/FileLog by default"}}}, + {"21.9", {{"output_format_decimal_trailing_zeros", true, false, "Do not output trailing zeros in text representation of Decimal types by default for better looking output"}, + {"use_hedged_requests", false, true, "Enable Hedged Requests feature by default"}}}, + {"21.7", {{"legacy_column_name_of_tuple_literal", true, false, "Add this setting only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher"}}}, + {"21.5", {{"async_socket_for_remote", false, true, "Fix all problems and turn on asynchronous reads from socket for remote queries by default again"}}}, + {"21.3", {{"async_socket_for_remote", true, false, "Turn off asynchronous reads from socket for remote queries because of some problems"}, + {"optimize_normalize_count_variants", false, true, "Rewrite aggregate functions that semantically equals to count() as count() by default"}, + {"normalize_function_names", false, true, "Normalize function names to their canonical names, this was needed for projection query routing"}}}, + {"21.2", {{"enable_global_with_statement", false, true, "Propagate WITH statements to UNION queries and all subqueries by default"}}}, + {"21.1", {{"insert_quorum_parallel", false, true, "Use parallel quorum inserts by default. It is significantly more convenient to use than sequential quorum inserts"}, + {"input_format_null_as_default", false, true, "Allow to insert NULL as default for input formats by default"}, + {"optimize_on_insert", false, true, "Enable data optimization on INSERT by default for better user experience"}, + {"use_compact_format_in_distributed_parts_names", false, true, "Use compact format for async INSERT into Distributed tables by default"}}}, + {"20.10", {{"format_regexp_escaping_rule", "Escaped", "Raw", "Use Raw as default escaping rule for Regexp format to male the behaviour more like to what users expect"}}}, + {"20.7", {{"show_table_uuid_in_table_create_query_if_not_nil", true, false, "Stop showing UID of the table in its CREATE query for Engine=Atomic"}}}, + {"20.5", {{"input_format_with_names_use_header", false, true, "Enable using header with names for formats with WithNames/WithNamesAndTypes suffixes"}, + {"allow_suspicious_codecs", true, false, "Don't allow to specify meaningless compression codecs"}}}, + {"20.4", {{"validate_polygons", false, true, "Throw exception if polygon is invalid in function pointInPolygon by default instead of returning possibly wrong results"}}}, + {"19.18", {{"enable_scalar_subquery_optimization", false, true, "Prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once"}}}, + {"19.14", {{"any_join_distinct_right_table_keys", true, false, "Disable ANY RIGHT and ANY FULL JOINs by default to avoid inconsistency"}}}, + {"19.12", {{"input_format_defaults_for_omitted_fields", false, true, "Enable calculation of complex default expressions for omitted fields for some input formats, because it should be the expected behaviour"}}}, + {"19.5", {{"max_partitions_per_insert_block", 0, 100, "Add a limit for the number of partitions in one block"}}}, + {"18.12.17", {{"enable_optimize_predicate_expression", 0, 1, "Optimize predicates to subqueries by default"}}}, }; diff --git a/src/Interpreters/HashJoin/AddedColumns.cpp b/src/Interpreters/HashJoin/AddedColumns.cpp index 21cb6e401ed..d70781d2fb3 100644 --- a/src/Interpreters/HashJoin/AddedColumns.cpp +++ b/src/Interpreters/HashJoin/AddedColumns.cpp @@ -20,10 +20,13 @@ void AddedColumns::buildOutput() {} template<> void AddedColumns::buildJoinGetOutput() {} +<<<<<<< HEAD template<> template void AddedColumns::buildOutputFromBlocks() {} +======= +>>>>>>> add threshold for table rows template<> void AddedColumns::buildOutput() @@ -32,9 +35,15 @@ void AddedColumns::buildOutput() buildOutputFromBlocks(); else { +<<<<<<< HEAD if (join_data_avg_perkey_rows < output_by_row_list_threshold) buildOutputFromBlocks(); else +======= + if (join_data_avg_perkey_rows < sort_right_perkey_rows_threshold) + buildOutputFromBlocks(); + else if (join_data_sorted) +>>>>>>> add threshold for table rows { for (size_t i = 0; i < this->size(); ++i) { @@ -44,14 +53,19 @@ void AddedColumns::buildOutput() if (row_ref_i) { const RowRefList * row_ref_list = reinterpret_cast(row_ref_i); +<<<<<<< HEAD for (auto it = row_ref_list->begin(); it.ok(); ++it) col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num); +======= + col->insertRangeFrom(*row_ref_list->block->getByPosition(right_indexes[i]).column, row_ref_list->row_num, row_ref_list->rows); +>>>>>>> add threshold for table rows } else type_name[i].type->insertDefaultInto(*col); } } } +<<<<<<< HEAD } } @@ -74,6 +88,25 @@ void AddedColumns::buildJoinGetOutput() nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num); else col->insertFrom(*column_from_block.column, row_ref->row_num); +======= + else + { + for (size_t i = 0; i < this->size(); ++i) + { + auto & col = columns[i]; + for (auto row_ref_i : lazy_output.row_refs) + { + if (row_ref_i) + { + const RowRefList * row_ref_list = reinterpret_cast(row_ref_i); + for (auto it = row_ref_list->begin(); it.ok(); ++it) + col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num); + } + else + type_name[i].type->insertDefaultInto(*col); + } + } +>>>>>>> add threshold for table rows } } } @@ -82,7 +115,11 @@ template<> template void AddedColumns::buildOutputFromBlocks() { +<<<<<<< HEAD if (this->size() == 0) +======= + if (this->size() == 0) +>>>>>>> add threshold for table rows return; std::vector blocks; std::vector row_nums; @@ -123,6 +160,32 @@ void AddedColumns::buildOutputFromBlocks() col->insertFrom(*blocks[j]->getByPosition(right_indexes[i]).column, row_nums[j]); else type_name[i].type->insertDefaultInto(*col); +<<<<<<< HEAD +======= + } + } +} + +template<> +void AddedColumns::buildJoinGetOutput() +{ + for (size_t i = 0; i < this->size(); ++i) + { + auto & col = columns[i]; + for (auto row_ref_i : lazy_output.row_refs) + { + if (!row_ref_i) + { + type_name[i].type->insertDefaultInto(*col); + continue; + } + const auto * row_ref = reinterpret_cast(row_ref_i); + const auto & column_from_block = row_ref->block->getByPosition(right_indexes[i]); + if (auto * nullable_col = typeid_cast(col.get()); nullable_col && !column_from_block.column->isNullable()) + nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num); + else + col->insertFrom(*column_from_block.column, row_ref->row_num); +>>>>>>> add threshold for table rows } } } diff --git a/src/Interpreters/HashJoin/AddedColumns.h b/src/Interpreters/HashJoin/AddedColumns.h index f1b95a63be6..5ae69fbbf66 100644 --- a/src/Interpreters/HashJoin/AddedColumns.h +++ b/src/Interpreters/HashJoin/AddedColumns.h @@ -196,6 +196,12 @@ private: } } + /** Build output from the blocks that extract from `RowRef` or `RowRefList`, to avoid block cache miss which may cause performance slow down. + * And This problem would happen it we directly build output from `RowRef` or `RowRefList`. + */ + template + void buildOutputFromBlocks(); + MutableColumns columns; bool is_join_get; std::vector right_indexes; diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 9c07a71e614..6f332118f8a 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -649,7 +649,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits) } data->keys_to_join = total_rows; shrinkStoredBlocksToFit(total_bytes); - return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); } @@ -1361,4 +1360,87 @@ bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr table return false; } +template +void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]]) +{ + constexpr JoinFeatures join_features; + if constexpr (join_features.is_all_join && (join_features.left || join_features.inner)) + { + auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref) + { + auto it = rows_ref.begin(); + if (it.ok()) + { + if (blocks.empty() || blocks.back().rows() > DEFAULT_BLOCK_SIZE) + blocks.emplace_back(it->block->cloneEmpty()); + } + else + { + return; + } + auto & block = blocks.back(); + size_t start_row = block.rows(); + for (; it.ok(); ++it) + { + for (size_t i = 0; i < block.columns(); ++i) + { + auto & col = *(block.getByPosition(i).column->assumeMutable()); + col.insertFrom(*it->block->getByPosition(i).column, it->row_num); + } + } + if (block.rows() > start_row) + { + RowRefList new_rows_ref(&block, start_row, block.rows() - start_row); + rows_ref = std::move(new_rows_ref); + } + }; + + auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map) + { + switch (data->type) + { + #define M(TYPE) \ + case Type::TYPE: \ + {\ + rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \ + break; \ + } + APPLY_FOR_JOIN_VARIANTS(M) + #undef M + default: + break; + } + }; + BlocksList sorted_blocks; + visit_rows_map(sorted_blocks, map); + data->blocks.swap(sorted_blocks); + } +} + +void HashJoin::tryRerangeRightTableData() +{ + if ((kind != JoinKind::Inner && kind != JoinKind::Left) || strictness != JoinStrictness::All || table_join->getMixedJoinExpression()) + return; + + if (!data || data->sorted || data->blocks.empty() || data->maps.size() > 1) + return; + + if (data->keys_to_join == 0) + data->keys_to_join = getTotalRowCount(); + if (sample_block_with_columns_to_add.columns() == 0 || data->rows_to_join > table_join->sortRightTableRowsThreshold() || data->avgPerKeyRows() < table_join->sortRightPerkeyRowsThreshold()) + { + LOG_DEBUG(log, "The joined right table total rows :{}, total keys :{}, columns added:{}", + data->rows_to_join, data->keys_to_join, sample_block_with_columns_to_add.columns()); + return; + } + std::cout << "sort right table rows" << std::endl; + joinDispatch( + kind, + strictness, + data->maps.front(), + [&](auto kind_, auto strictness_, auto & map_) { tryRerangeRightTableDataImpl(map_); }); + std::cout << "sort right finished" << std::endl; + data->sorted = true; +} + } diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h index d645b8e9273..230343691ea 100644 --- a/src/Interpreters/HashJoin/HashJoin.h +++ b/src/Interpreters/HashJoin/HashJoin.h @@ -345,11 +345,12 @@ public: size_t blocks_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0; - /// Number of rows of right table to join size_t rows_to_join = 0; /// Number of keys of right table to join size_t keys_to_join = 0; + /// Whether the right table reranged by key + bool sorted = false; size_t avgPerKeyRows() const { @@ -465,6 +466,10 @@ private: void validateAdditionalFilterExpression(std::shared_ptr additional_filter_expression); bool needUsedFlagsForPerRightTableRow(std::shared_ptr table_join_) const; + + void tryRerangeRightTableData() override; + template + void tryRerangeRightTableDataImpl(Map & map); }; } diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index 97ad57d26ea..9d94c3f62c2 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -121,7 +121,142 @@ private: std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, - JoinStuff::JoinUsedFlags & used_flags); + JoinStuff::JoinUsedFlags & used_flags) + { + constexpr JoinFeatures join_features; + + size_t rows = added_columns.rows_to_add; + if constexpr (need_filter) + added_columns.filter = IColumn::Filter(rows, 0); + if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right))) + added_columns.output_by_row_list = true; + + Arena pool; + + if constexpr (join_features.need_replication) + added_columns.offsets_to_replicate = std::make_unique(rows); + + IColumn::Offset current_offset = 0; + size_t max_joined_block_rows = added_columns.max_joined_block_rows; + size_t i = 0; + for (; i < rows; ++i) + { + if constexpr (join_features.need_replication) + { + if (unlikely(current_offset >= max_joined_block_rows)) + { + added_columns.offsets_to_replicate->resize_assume_reserved(i); + added_columns.filter.resize_assume_reserved(i); + break; + } + } + + bool right_row_found = false; + + KnownRowsHolder known_rows; + for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx) + { + const auto & join_keys = added_columns.join_on_keys[onexpr_idx]; + if (join_keys.null_map && (*join_keys.null_map)[i]) + continue; + + bool row_acceptable = !join_keys.isRowFiltered(i); + using FindResult = typename KeyGetter::FindResult; + auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult(); + + if (find_result.isFound()) + { + right_row_found = true; + auto & mapped = find_result.getMapped(); + if constexpr (join_features.is_asof_join) + { + const IColumn & left_asof_key = added_columns.leftAsofKey(); + + auto row_ref = mapped->findAsof(left_asof_key, i); + if (row_ref && row_ref->block) + { + setUsed(added_columns.filter, i); + if constexpr (flag_per_row) + used_flags.template setUsed(row_ref->block, row_ref->row_num, 0); + else + used_flags.template setUsed(find_result); + + added_columns.appendFromBlock(row_ref, join_features.add_missing); + } + else + addNotFoundRow(added_columns, current_offset); + } + else if constexpr (join_features.is_all_join) + { + setUsed(added_columns.filter, i); + used_flags.template setUsed(find_result); + auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; + addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); + } + else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right) + { + /// Use first appeared left key + it needs left columns replication + bool used_once = used_flags.template setUsedOnce(find_result); + if (used_once) + { + auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; + setUsed(added_columns.filter, i); + addFoundRowAll( + mapped, added_columns, current_offset, known_rows, used_flags_opt); + } + } + else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner) + { + bool used_once = used_flags.template setUsedOnce(find_result); + + /// Use first appeared left key only + if (used_once) + { + setUsed(added_columns.filter, i); + added_columns.appendFromBlock(&mapped, join_features.add_missing); + } + + break; + } + else if constexpr (join_features.is_any_join && join_features.full) + { + /// TODO + } + else if constexpr (join_features.is_anti_join) + { + if constexpr (join_features.right && join_features.need_flags) + used_flags.template setUsed(find_result); + } + else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) + { + setUsed(added_columns.filter, i); + used_flags.template setUsed(find_result); + added_columns.appendFromBlock(&mapped, join_features.add_missing); + + if (join_features.is_any_or_semi_join) + { + break; + } + } + } + } + + if (!right_row_found) + { + if constexpr (join_features.is_anti_join && join_features.left) + setUsed(added_columns.filter, i); + addNotFoundRow(added_columns, current_offset); + } + + if constexpr (join_features.need_replication) + { + (*added_columns.offsets_to_replicate)[i] = current_offset; + } + } + + added_columns.applyLazyDefaults(); + return i; + } template static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]); @@ -131,7 +266,120 @@ private: size_t left_start_row, const std::vector & selected_rows, const std::vector & row_replicate_offset, - AddedColumns & added_columns); + AddedColumns & added_columns) + { + ColumnPtr result_column; + do + { + if (selected_rows.empty()) + { + result_column = ColumnUInt8::create(); + break; + } + const Block & sample_right_block = *((*selected_rows.begin())->block); + if (!sample_right_block || !added_columns.additional_filter_expression) + { + auto filter = ColumnUInt8::create(); + filter->insertMany(1, selected_rows.size()); + result_column = std::move(filter); + break; + } + + auto required_cols = added_columns.additional_filter_expression->getRequiredColumnsWithTypes(); + if (required_cols.empty()) + { + Block block; + added_columns.additional_filter_expression->execute(block); + result_column = block.getByPosition(0).column->cloneResized(selected_rows.size()); + break; + } + NameSet required_column_names; + for (auto & col : required_cols) + required_column_names.insert(col.name); + + Block executed_block; + size_t right_col_pos = 0; + for (const auto & col : sample_right_block.getColumnsWithTypeAndName()) + { + if (required_column_names.contains(col.name)) + { + auto new_col = col.column->cloneEmpty(); + for (const auto & selected_row : selected_rows) + { + const auto & src_col = selected_row->block->getByPosition(right_col_pos); + new_col->insertFrom(*src_col.column, selected_row->row_num); + } + executed_block.insert({std::move(new_col), col.type, col.name}); + } + right_col_pos += 1; + } + if (!executed_block) + { + result_column = ColumnUInt8::create(); + break; + } + + for (const auto & col_name : required_column_names) + { + const auto * src_col = added_columns.left_block.findByName(col_name); + if (!src_col) + continue; + auto new_col = src_col->column->cloneEmpty(); + size_t prev_left_offset = 0; + for (size_t i = 1; i < row_replicate_offset.size(); ++i) + { + const size_t & left_offset = row_replicate_offset[i]; + size_t rows = left_offset - prev_left_offset; + if (rows) + new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows); + prev_left_offset = left_offset; + } + executed_block.insert({std::move(new_col), src_col->type, col_name}); + } + if (!executed_block) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "required columns: [{}], but not found any in left/right table. right table: {}, left table: {}", + required_cols.toString(), + sample_right_block.dumpNames(), + added_columns.left_block.dumpNames()); + } + + for (const auto & col : executed_block.getColumnsWithTypeAndName()) + if (!col.column || !col.type) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal nullptr column in input block: {}", executed_block.dumpStructure()); + + added_columns.additional_filter_expression->execute(executed_block); + result_column = executed_block.getByPosition(0).column->convertToFullColumnIfConst(); + executed_block.clear(); + } while (false); + + result_column = result_column->convertToFullIfNeeded(); + if (result_column->isNullable()) + { + /// Convert Nullable(UInt8) to UInt8 ensuring that nulls are zeros + /// Trying to avoid copying data, since we are the only owner of the column. + ColumnPtr mask_column = assert_cast(*result_column).getNullMapColumnPtr(); + + MutableColumnPtr mutable_column; + { + ColumnPtr nested_column = assert_cast(*result_column).getNestedColumnPtr(); + result_column.reset(); + mutable_column = IColumn::mutate(std::move(nested_column)); + } + + auto & column_data = assert_cast(*mutable_column).getData(); + const auto & mask_column_data = assert_cast(*mask_column).getData(); + for (size_t i = 0; i < column_data.size(); ++i) + { + if (mask_column_data[i]) + column_data[i] = 0; + } + return mutable_column; + } + return result_column; + } /// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression. template @@ -141,7 +389,177 @@ private: AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]], bool need_filter [[maybe_unused]], - bool flag_per_row [[maybe_unused]]); + bool need_flags [[maybe_unused]], + bool add_missing [[maybe_unused]], + bool flag_per_row [[maybe_unused]]) + { + size_t left_block_rows = added_columns.rows_to_add; + if (need_filter) + added_columns.filter = IColumn::Filter(left_block_rows, 0); + + std::unique_ptr pool; + + if constexpr (need_replication) + added_columns.offsets_to_replicate = std::make_unique(left_block_rows); + + std::vector row_replicate_offset; + row_replicate_offset.reserve(left_block_rows); + + using FindResult = typename KeyGetter::FindResult; + size_t max_joined_block_rows = added_columns.max_joined_block_rows; + size_t left_row_iter = 0; + PreSelectedRows selected_rows; + selected_rows.reserve(left_block_rows); + std::vector find_results; + find_results.reserve(left_block_rows); + bool exceeded_max_block_rows = false; + IColumn::Offset total_added_rows = 0; + IColumn::Offset current_added_rows = 0; + + auto collect_keys_matched_rows_refs = [&]() + { + pool = std::make_unique(); + find_results.clear(); + row_replicate_offset.clear(); + row_replicate_offset.push_back(0); + current_added_rows = 0; + selected_rows.clear(); + for (; left_row_iter < left_block_rows; ++left_row_iter) + { + if constexpr (need_replication) + { + if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows)) + { + break; + } + } + KnownRowsHolder all_flag_known_rows; + KnownRowsHolder single_flag_know_rows; + for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx) + { + const auto & join_keys = added_columns.join_on_keys[join_clause_idx]; + if (join_keys.null_map && (*join_keys.null_map)[left_row_iter]) + continue; + + bool row_acceptable = !join_keys.isRowFiltered(left_row_iter); + auto find_result = row_acceptable + ? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool) + : FindResult(); + + if (find_result.isFound()) + { + auto & mapped = find_result.getMapped(); + find_results.push_back(find_result); + if (flag_per_row) + addFoundRowAll(mapped, selected_rows, current_added_rows, all_flag_known_rows, nullptr); + else + addFoundRowAll(mapped, selected_rows, current_added_rows, single_flag_know_rows, nullptr); + } + } + row_replicate_offset.push_back(current_added_rows); + } + }; + + auto copy_final_matched_rows = [&](size_t left_start_row, ColumnPtr filter_col) + { + const PaddedPODArray & filter_flags = assert_cast(*filter_col).getData(); + + size_t prev_replicated_row = 0; + auto selected_right_row_it = selected_rows.begin(); + size_t find_result_index = 0; + for (size_t i = 1, n = row_replicate_offset.size(); i < n; ++i) + { + bool any_matched = false; + /// For all right join, flag_per_row is true, we need mark used flags for each row. + if (flag_per_row) + { + for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row) + { + if (filter_flags[replicated_row]) + { + any_matched = true; + added_columns.appendFromBlock(*selected_right_row_it, add_missing); + total_added_rows += 1; + if (need_flags) + used_flags.template setUsed((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); + } + ++selected_right_row_it; + } + } + else + { + for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row) + { + if (filter_flags[replicated_row]) + { + any_matched = true; + added_columns.appendFromBlock(*selected_right_row_it, add_missing); + total_added_rows += 1; + } + ++selected_right_row_it; + } + } + if (!any_matched) + { + if (add_missing) + addNotFoundRow(added_columns, total_added_rows); + else + addNotFoundRow(added_columns, total_added_rows); + } + else + { + if (!flag_per_row && need_flags) + used_flags.template setUsed(find_results[find_result_index]); + if (need_filter) + setUsed(added_columns.filter, left_start_row + i - 1); + if (add_missing) + added_columns.applyLazyDefaults(); + } + find_result_index += (prev_replicated_row != row_replicate_offset[i]); + + if constexpr (need_replication) + { + (*added_columns.offsets_to_replicate)[left_start_row + i - 1] = total_added_rows; + } + prev_replicated_row = row_replicate_offset[i]; + } + }; + + while (left_row_iter < left_block_rows && !exceeded_max_block_rows) + { + auto left_start_row = left_row_iter; + collect_keys_matched_rows_refs(); + if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Sizes are mismatched. selected_rows.size:{}, current_added_rows:{}, row_replicate_offset.size:{}, left_row_iter: {}, " + "left_start_row: {}", + selected_rows.size(), + current_added_rows, + row_replicate_offset.size(), + left_row_iter, + left_start_row); + } + auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns); + copy_final_matched_rows(left_start_row, filter_col); + + if constexpr (need_replication) + { + // Add a check for current_added_rows to avoid run the filter expression on too small size batch. + if (total_added_rows >= max_joined_block_rows || current_added_rows < 1024) + exceeded_max_block_rows = true; + } + } + + if constexpr (need_replication) + { + added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter); + added_columns.filter.resize_assume_reserved(left_row_iter); + } + added_columns.applyLazyDefaults(); + return left_row_iter; + } /// Cut first num_rows rows from block in place and returns block with remaining rows static Block sliceBlock(Block & block, size_t num_rows); diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index 7374348da50..8f648de2538 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -115,6 +115,7 @@ public: /// Peek next stream of delayed joined blocks. virtual IBlocksStreamPtr getDelayedBlocks() { return nullptr; } virtual bool hasDelayedBlocks() const { return false; } + virtual void tryRerangeRightTableData() {} virtual IBlocksStreamPtr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0; diff --git a/src/Interpreters/RowRefs.h b/src/Interpreters/RowRefs.h index 7c98c47dd11..f8ac68191d6 100644 --- a/src/Interpreters/RowRefs.h +++ b/src/Interpreters/RowRefs.h @@ -123,6 +123,7 @@ struct RowRefList : RowRef RowRefList() {} /// NOLINT RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {} + RowRefList(const Block * block_, size_t row_start_, size_t rows_) : RowRef(block_, row_start_), rows(static_cast(rows_)) {} ForwardIterator begin() const { return ForwardIterator(this); } diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index ca204bcb482..f2fb6327129 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -299,13 +299,17 @@ IProcessor::Status FillingRightJoinSideTransform::prepare() void FillingRightJoinSideTransform::work() { - auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); + auto & input = inputs.front(); + auto block = input.getHeader().cloneWithColumns(chunk.detachColumns()); if (for_totals) join->setTotals(block); else stop_reading = !join->addBlockToJoin(block); + if (input.isFinished()) + join->tryRerangeRightTableData(); + set_totals = for_totals; }