add threshold for table rows

This commit is contained in:
kevinyhzou 2024-05-10 12:18:06 +08:00
parent bc00f274aa
commit 37c3f4a870
10 changed files with 841 additions and 6 deletions

View File

@ -922,6 +922,8 @@ class IColumn;
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Int32, join_to_sort_perkey_rows_threshold, 40, "The lower limit of per-key average rows in the right table to determine whether to sort it in hash join.", 0) \
M(Int32, join_to_sort_table_rows_threshold, 10000, "The upper limit of rows in the right table to determine whether to sort it in hash join.", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\
\

View File

@ -516,6 +516,259 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"enable_optimize_predicate_expression", 0, 1, "Optimize predicates to subqueries by default"}
}
},
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
{"optimize_functions_to_subcolumns", false, true, "Enable optimization by default"},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
{"lightweight_mutation_projection_mode", "throw", "throw", "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop all projection related to this table then do lightweight delete."},
{"database_replicated_allow_heavy_create", true, false, "Long-running DDL queries (CREATE AS SELECT and POPULATE) for Replicated database engine was forbidden"},
{"query_plan_merge_filters", false, false, "Allow to merge filters in the query plan"},
{"azure_sdk_max_retries", 10, 10, "Maximum number of retries in azure sdk"},
{"azure_sdk_retry_initial_backoff_ms", 10, 10, "Minimal backoff between retries in azure sdk"},
{"azure_sdk_retry_max_backoff_ms", 1000, 1000, "Maximal backoff between retries in azure sdk"},
{"join_to_sort_perkey_rows_threshold", 0, 2, "The lower limit of per-key average rows in the right table to determine whether to sort it in hash join." },
{"join_to_sort_perkey_rows_threshold", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to sort it in hash join." },
{"join_to_sort_table_rows_threshold", 0, 10000, "The upper limit of rows in the right table to determine whether to sort it in hash join." },
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
{"hdfs_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in HDFS engine instead of empty query result"},
{"azure_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in AzureBlobStorage engine instead of empty query result"},
{"s3_validate_request_settings", true, true, "Allow to disable S3 request settings validation"},
{"allow_experimental_full_text_index", false, false, "Enable experimental full-text index"},
{"azure_skip_empty_files", false, false, "Allow to skip empty files in azure table engine"},
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"s3_max_part_number", 10000, 10000, "Maximum part number number for s3 upload part"},
{"s3_max_single_operation_copy_size", 32 * 1024 * 1024, 32 * 1024 * 1024, "Maximum size for a single copy operation in s3"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Increase block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
{"allow_deprecated_snowflake_conversion_functions", true, false, "Disabled deprecated functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake."},
{"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."},
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},
{"allow_statistics_optimize", false, false, "The setting was renamed. The previous name is `allow_statistic_optimize`."},
{"allow_experimental_statistics", false, false, "The setting was renamed. The previous name is `allow_experimental_statistic`."},
{"enable_vertical_final", false, true, "Enable vertical final by default again after fixing bug"},
{"parallel_replicas_custom_key_range_lower", 0, 0, "Add settings to control the range filter when using parallel replicas with dynamic shards"},
{"parallel_replicas_custom_key_range_upper", 0, 0, "Add settings to control the range filter when using parallel replicas with dynamic shards. A value of 0 disables the upper limit"},
{"output_format_pretty_display_footer_column_names", 0, 1, "Add a setting to display column names in the footer if there are many rows. Threshold value is controlled by output_format_pretty_display_footer_column_names_min_rows."},
{"output_format_pretty_display_footer_column_names_min_rows", 0, 50, "Add a setting to control the threshold value for setting output_format_pretty_display_footer_column_names_min_rows. Default 50."},
{"output_format_csv_serialize_tuple_into_separate_columns", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_deserialize_separate_columns_into_tuple", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_try_infer_strings_from_quoted_tuples", true, true, "A new way of how interpret tuples in CSV format was added."},
}},
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
{"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."},
{"cross_join_min_rows_to_compress", 0, 10000000, "Minimal count of rows to compress block in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached."},
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached."},
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"cast_string_to_dynamic_use_inference", false, false, "Add setting to allow converting String to Dynamic through parsing"},
{"allow_experimental_dynamic_type", false, false, "Add new experimental Dynamic type"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},
{"ignore_drop_queries_probability", 0, 0, "Allow to ignore drop queries in server with specified probability for testing purposes"},
{"lightweight_deletes_sync", 2, 2, "The same as 'mutation_sync', but controls only execution of lightweight deletes"},
{"query_cache_system_table_handling", "save", "throw", "The query cache no longer caches results of queries against system tables"},
{"input_format_json_ignore_unnecessary_fields", false, true, "Ignore unnecessary fields and not parse them. Enabling this may not throw exceptions on json strings of invalid format or with duplicated fields"},
{"input_format_hive_text_allow_variable_number_of_columns", false, true, "Ignore extra columns in Hive Text input (if file has more columns than expected) and treat missing fields in Hive Text input as default values."},
{"allow_experimental_database_replicated", false, true, "Database engine Replicated is now in Beta stage"},
{"temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds", (10 * 60 * 1000), (10 * 60 * 1000), "Wait time to lock cache for sapce reservation in temporary data in filesystem cache"},
{"optimize_rewrite_sum_if_to_count_if", false, true, "Only available for the analyzer, where it works correctly"},
{"azure_allow_parallel_part_upload", "true", "true", "Use multiple threads for azure multipart upload."},
{"max_recursive_cte_evaluation_depth", DBMS_RECURSIVE_CTE_MAX_EVALUATION_DEPTH, DBMS_RECURSIVE_CTE_MAX_EVALUATION_DEPTH, "Maximum limit on recursive CTE evaluation depth"},
{"query_plan_convert_outer_join_to_inner_join", false, true, "Allow to convert OUTER JOIN to INNER JOIN if filter after JOIN always filters default values"},
}},
{"24.3", {{"s3_connect_timeout_ms", 1000, 1000, "Introduce new dedicated setting for s3 connection timeout"},
{"allow_experimental_shared_merge_tree", false, true, "The setting is obsolete"},
{"use_page_cache_for_disks_without_file_cache", false, false, "Added userspace page cache"},
{"read_from_page_cache_if_exists_otherwise_bypass_cache", false, false, "Added userspace page cache"},
{"page_cache_inject_eviction", false, false, "Added userspace page cache"},
{"default_table_engine", "None", "MergeTree", "Set default table engine to MergeTree for better usability"},
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
{"traverse_shadow_remote_data_paths", false, false, "Traverse shadow directory when query system.remote_data_paths."},
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
{"log_processors_profiles", false, true, "Enable by default"},
{"function_locate_has_mysql_compatible_argument_order", false, true, "Increase compatibility with MySQL's locate function."},
{"allow_suspicious_primary_key", true, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)"},
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
{"analyzer_compatibility_join_using_top_level_identifier", false, false, "Force to resolve identifier in JOIN USING from projection"},
{"distributed_insert_skip_read_only_replicas", false, false, "If true, INSERT into Distributed will skip read-only replicas"},
{"keeper_max_retries", 10, 10, "Max retries for general keeper operations"},
{"keeper_retry_initial_backoff_ms", 100, 100, "Initial backoff timeout for general keeper operations"},
{"keeper_retry_max_backoff_ms", 5000, 5000, "Max backoff timeout for general keeper operations"},
{"s3queue_allow_experimental_sharded_mode", false, false, "Enable experimental sharded mode of S3Queue table engine. It is experimental because it will be rewritten"},
{"allow_experimental_analyzer", false, true, "Enable analyzer and planner by default."},
{"merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability", 0.0, 0.0, "For testing of `PartsSplitter` - split read ranges into intersecting and non intersecting every time you read from MergeTree with the specified probability."},
{"allow_get_client_http_header", false, false, "Introduced a new function."},
{"output_format_pretty_row_numbers", false, true, "It is better for usability."},
{"output_format_pretty_max_value_width_apply_for_single_value", true, false, "Single values in Pretty formats won't be cut."},
{"output_format_parquet_string_as_string", false, true, "ClickHouse allows arbitrary binary data in the String data type, which is typically UTF-8. Parquet/ORC/Arrow Strings only support UTF-8. That's why you can choose which Arrow's data type to use for the ClickHouse String data type - String or Binary. While Binary would be more correct and compatible, using String by default will correspond to user expectations in most cases."},
{"output_format_orc_string_as_string", false, true, "ClickHouse allows arbitrary binary data in the String data type, which is typically UTF-8. Parquet/ORC/Arrow Strings only support UTF-8. That's why you can choose which Arrow's data type to use for the ClickHouse String data type - String or Binary. While Binary would be more correct and compatible, using String by default will correspond to user expectations in most cases."},
{"output_format_arrow_string_as_string", false, true, "ClickHouse allows arbitrary binary data in the String data type, which is typically UTF-8. Parquet/ORC/Arrow Strings only support UTF-8. That's why you can choose which Arrow's data type to use for the ClickHouse String data type - String or Binary. While Binary would be more correct and compatible, using String by default will correspond to user expectations in most cases."},
{"output_format_parquet_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_orc_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_pretty_highlight_digit_groups", false, true, "If enabled and if output is a terminal, highlight every digit corresponding to the number of thousands, millions, etc. with underline."},
{"geo_distance_returns_float64_on_float64_arguments", false, true, "Increase the default precision."},
{"azure_max_inflight_parts_for_one_file", 20, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited."},
{"azure_strict_upload_part_size", 0, 0, "The exact size of part to upload during multipart upload to Azure blob storage."},
{"azure_min_upload_part_size", 16*1024*1024, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage."},
{"azure_max_upload_part_size", 5ull*1024*1024*1024, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage."},
{"azure_upload_part_size_multiply_factor", 2, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage."},
{"azure_upload_part_size_multiply_parts_count_threshold", 500, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor."},
{"output_format_csv_serialize_tuple_into_separate_columns", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_deserialize_separate_columns_into_tuple", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_try_infer_strings_from_quoted_tuples", true, true, "A new way of how interpret tuples in CSV format was added."},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},
{"output_format_values_escape_quote_with_quote", false, false, "If true escape ' with '', otherwise quoted with \\'"},
{"output_format_pretty_single_large_number_tip_threshold", 0, 1'000'000, "Print a readable number tip on the right side of the table if the block consists of a single number which exceeds this value (except 0)"},
{"input_format_try_infer_exponent_floats", true, false, "Don't infer floats in exponential notation by default"},
{"query_plan_optimize_prewhere", true, true, "Allow to push down filter to PREWHERE expression for supported storages"},
{"async_insert_max_data_size", 1000000, 10485760, "The previous value appeared to be too small."},
{"async_insert_poll_timeout_ms", 10, 10, "Timeout in milliseconds for polling data from asynchronous insert queue"},
{"async_insert_use_adaptive_busy_timeout", false, true, "Use adaptive asynchronous insert timeout"},
{"async_insert_busy_timeout_min_ms", 50, 50, "The minimum value of the asynchronous insert timeout in milliseconds; it also serves as the initial value, which may be increased later by the adaptive algorithm"},
{"async_insert_busy_timeout_max_ms", 200, 200, "The minimum value of the asynchronous insert timeout in milliseconds; async_insert_busy_timeout_ms is aliased to async_insert_busy_timeout_max_ms"},
{"async_insert_busy_timeout_increase_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases"},
{"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"},
{"format_template_row_format", "", "", "Template row format string can be set directly in query"},
{"format_template_resultset_format", "", "", "Template result set format string can be set in query"},
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", true, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},
{"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"},
{"azure_max_single_part_copy_size", 256*1024*1024, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage."},
{"min_external_table_block_size_rows", DEFAULT_INSERT_BLOCK_SIZE, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to external table to specified size in rows, if blocks are not big enough"},
{"min_external_table_block_size_bytes", DEFAULT_INSERT_BLOCK_SIZE * 256, DEFAULT_INSERT_BLOCK_SIZE * 256, "Squash blocks passed to external table to specified size in bytes, if blocks are not big enough."},
{"parallel_replicas_prefer_local_join", true, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN."},
{"optimize_time_filter_with_preimage", true, true, "Optimize Date and DateTime predicates by converting functions into equivalent comparisons without conversions (e.g. toYear(col) = 2023 -> col >= '2023-01-01' AND col <= '2023-12-31')"},
{"extract_key_value_pairs_max_pairs_per_row", 0, 0, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory."},
{"default_view_definer", "CURRENT_USER", "CURRENT_USER", "Allows to set default `DEFINER` option while creating a view"},
{"default_materialized_view_sql_security", "DEFINER", "DEFINER", "Allows to set a default value for SQL SECURITY option when creating a materialized view"},
{"default_normal_view_sql_security", "INVOKER", "INVOKER", "Allows to set default `SQL SECURITY` option while creating a normal view"},
{"mysql_map_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"mysql_map_fixed_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},
{"output_format_arrow_use_signed_indexes_for_dictionary", false, true, "Use signed indexes type for Arrow dictionaries by default as it's recommended"},
{"allow_experimental_variant_type", false, false, "Add new experimental Variant type"},
{"use_variant_as_common_type", false, false, "Allow to use Variant in if/multiIf if there is no common type"},
{"output_format_arrow_use_64_bit_indexes_for_dictionary", false, false, "Allow to use 64 bit indexes type in Arrow dictionaries"},
{"parallel_replicas_mark_segment_size", 128, 128, "Add new setting to control segment size in new parallel replicas coordinator implementation"},
{"ignore_materialized_views_with_dropped_target_table", false, false, "Add new setting to allow to ignore materialized views with dropped target table"},
{"output_format_compression_level", 3, 3, "Allow to change compression level in the query output"},
{"output_format_compression_zstd_window_log", 0, 0, "Allow to change zstd window log in the query output when zstd compression is used"},
{"enable_zstd_qat_codec", false, false, "Add new ZSTD_QAT codec"},
{"enable_vertical_final", false, true, "Use vertical final by default"},
{"output_format_arrow_use_64_bit_indexes_for_dictionary", false, false, "Allow to use 64 bit indexes type in Arrow dictionaries"},
{"max_rows_in_set_to_optimize_join", 100000, 0, "Disable join optimization as it prevents from read in order optimization"},
{"output_format_pretty_color", true, "auto", "Setting is changed to allow also for auto value, disabling ANSI escapes if output is not a tty"},
{"function_visible_width_behavior", 0, 1, "We changed the default behavior of `visibleWidth` to be more precise"},
{"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"},
{"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"},
{"optimize_injective_functions_in_group_by", false, true, "Replace injective functions by it's arguments in GROUP BY section in analyzer"},
{"update_insert_deduplication_token_in_dependent_materialized_views", false, false, "Allow to update insert deduplication token with table identifier during insert in dependent materialized views"},
{"azure_max_unexpected_write_error_retries", 4, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write"},
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", false, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},
{"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"}}},
{"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."},
{"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"},
{"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"},
{"input_format_arrow_allow_missing_columns", false, true, "Allow missing columns in Arrow files by default"}}},
{"23.11", {{"parsedatetime_parse_without_leading_zeros", false, true, "Improved compatibility with MySQL DATE_FORMAT/STR_TO_DATE"}}},
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"},
{"input_format_json_try_infer_named_tuples_from_objects", false, true, "Try to infer named Tuples from JSON objects by default"},
{"input_format_json_read_numbers_as_strings", false, true, "Allow to read numbers as strings in JSON formats by default"},
{"input_format_json_read_arrays_as_strings", false, true, "Allow to read arrays as strings in JSON formats by default"},
{"input_format_json_infer_incomplete_types_as_strings", false, true, "Allow to infer incomplete types as Strings in JSON formats by default"},
{"input_format_json_try_infer_numbers_from_strings", true, false, "Don't infer numbers from strings in JSON formats by default to prevent possible parsing errors"},
{"http_write_exception_in_output_format", false, true, "Output valid JSON/XML on exception in HTTP streaming."}}},
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}},
{"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},
{"output_format_parquet_compliant_nested_types", false, true, "Change an internal field name in output Parquet file schema."}}},
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"},
{"allow_nonconst_timezone_arguments", true, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()."},
{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},
{"connect_timeout_with_failover_secure_ms", 100, 1000, "Increase default secure connect timeout because of async connect"},
{"hedged_connection_timeout_ms", 100, 50, "Start new connection in hedged requests after 50 ms instead of 100 to correspond with previous connect timeout"},
{"formatdatetime_f_prints_single_zero", true, false, "Improved compatibility with MySQL DATE_FORMAT()/STR_TO_DATE()"},
{"formatdatetime_parsedatetime_m_is_month_name", false, true, "Improved compatibility with MySQL DATE_FORMAT/STR_TO_DATE"}}},
{"23.3", {{"output_format_parquet_version", "1.0", "2.latest", "Use latest Parquet format version for output format"},
{"input_format_json_ignore_unknown_keys_in_named_tuple", false, true, "Improve parsing JSON objects as named tuples"},
{"input_format_native_allow_types_conversion", false, true, "Allow types conversion in Native input forma"},
{"output_format_arrow_compression_method", "none", "lz4_frame", "Use lz4 compression in Arrow output format by default"},
{"output_format_parquet_compression_method", "snappy", "lz4", "Use lz4 compression in Parquet output format by default"},
{"output_format_orc_compression_method", "none", "lz4_frame", "Use lz4 compression in ORC output format by default"},
{"async_query_sending_for_remote", false, true, "Create connections and send query async across shards"}}},
{"23.2", {{"output_format_parquet_fixed_string_as_fixed_byte_array", false, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type for FixedString by default"},
{"output_format_arrow_fixed_string_as_fixed_byte_array", false, true, "Use Arrow FIXED_SIZE_BINARY type for FixedString by default"},
{"query_plan_remove_redundant_distinct", false, true, "Remove redundant Distinct step in query plan"},
{"optimize_duplicate_order_by_and_distinct", true, false, "Remove duplicate ORDER BY and DISTINCT if it's possible"},
{"insert_keeper_max_retries", 0, 20, "Enable reconnections to Keeper on INSERT, improve reliability"}}},
{"23.1", {{"input_format_json_read_objects_as_strings", 0, 1, "Enable reading nested json objects as strings while object type is experimental"},
{"input_format_json_defaults_for_missing_elements_in_named_tuple", false, true, "Allow missing elements in JSON objects while reading named tuples by default"},
{"input_format_csv_detect_header", false, true, "Detect header in CSV format by default"},
{"input_format_tsv_detect_header", false, true, "Detect header in TSV format by default"},
{"input_format_custom_detect_header", false, true, "Detect header in CustomSeparated format by default"},
{"query_plan_remove_redundant_sorting", false, true, "Remove redundant sorting in query plan. For example, sorting steps related to ORDER BY clauses in subqueries"}}},
{"22.12", {{"max_size_to_preallocate_for_aggregation", 10'000'000, 100'000'000, "This optimizes performance"},
{"query_plan_aggregation_in_order", 0, 1, "Enable some refactoring around query plan"},
{"format_binary_max_string_size", 0, 1_GiB, "Prevent allocating large amount of memory"}}},
{"22.11", {{"use_structure_from_insertion_table_in_table_functions", 0, 2, "Improve using structure from insertion table in table functions"}}},
{"22.9", {{"force_grouping_standard_compatibility", false, true, "Make GROUPING function output the same as in SQL standard and other DBMS"}}},
{"22.7", {{"cross_to_inner_join_rewrite", 1, 2, "Force rewrite comma join to inner"},
{"enable_positional_arguments", false, true, "Enable positional arguments feature by default"},
{"format_csv_allow_single_quotes", true, false, "Most tools don't treat single quote in CSV specially, don't do it by default too"}}},
{"22.6", {{"output_format_json_named_tuples_as_objects", false, true, "Allow to serialize named tuples as JSON objects in JSON formats by default"},
{"input_format_skip_unknown_fields", false, true, "Optimize reading subset of columns for some input formats"}}},
{"22.5", {{"memory_overcommit_ratio_denominator", 0, 1073741824, "Enable memory overcommit feature by default"},
{"memory_overcommit_ratio_denominator_for_user", 0, 1073741824, "Enable memory overcommit feature by default"}}},
{"22.4", {{"allow_settings_after_format_in_insert", true, false, "Do not allow SETTINGS after FORMAT for INSERT queries because ClickHouse interpret SETTINGS as some values, which is misleading"}}},
{"22.3", {{"cast_ipv4_ipv6_default_on_conversion_error", true, false, "Make functions cast(value, 'IPv4') and cast(value, 'IPv6') behave same as toIPv4 and toIPv6 functions"}}},
{"21.12", {{"stream_like_engine_allow_direct_select", true, false, "Do not allow direct select for Kafka/RabbitMQ/FileLog by default"}}},
{"21.9", {{"output_format_decimal_trailing_zeros", true, false, "Do not output trailing zeros in text representation of Decimal types by default for better looking output"},
{"use_hedged_requests", false, true, "Enable Hedged Requests feature by default"}}},
{"21.7", {{"legacy_column_name_of_tuple_literal", true, false, "Add this setting only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher"}}},
{"21.5", {{"async_socket_for_remote", false, true, "Fix all problems and turn on asynchronous reads from socket for remote queries by default again"}}},
{"21.3", {{"async_socket_for_remote", true, false, "Turn off asynchronous reads from socket for remote queries because of some problems"},
{"optimize_normalize_count_variants", false, true, "Rewrite aggregate functions that semantically equals to count() as count() by default"},
{"normalize_function_names", false, true, "Normalize function names to their canonical names, this was needed for projection query routing"}}},
{"21.2", {{"enable_global_with_statement", false, true, "Propagate WITH statements to UNION queries and all subqueries by default"}}},
{"21.1", {{"insert_quorum_parallel", false, true, "Use parallel quorum inserts by default. It is significantly more convenient to use than sequential quorum inserts"},
{"input_format_null_as_default", false, true, "Allow to insert NULL as default for input formats by default"},
{"optimize_on_insert", false, true, "Enable data optimization on INSERT by default for better user experience"},
{"use_compact_format_in_distributed_parts_names", false, true, "Use compact format for async INSERT into Distributed tables by default"}}},
{"20.10", {{"format_regexp_escaping_rule", "Escaped", "Raw", "Use Raw as default escaping rule for Regexp format to male the behaviour more like to what users expect"}}},
{"20.7", {{"show_table_uuid_in_table_create_query_if_not_nil", true, false, "Stop showing UID of the table in its CREATE query for Engine=Atomic"}}},
{"20.5", {{"input_format_with_names_use_header", false, true, "Enable using header with names for formats with WithNames/WithNamesAndTypes suffixes"},
{"allow_suspicious_codecs", true, false, "Don't allow to specify meaningless compression codecs"}}},
{"20.4", {{"validate_polygons", false, true, "Throw exception if polygon is invalid in function pointInPolygon by default instead of returning possibly wrong results"}}},
{"19.18", {{"enable_scalar_subquery_optimization", false, true, "Prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once"}}},
{"19.14", {{"any_join_distinct_right_table_keys", true, false, "Disable ANY RIGHT and ANY FULL JOINs by default to avoid inconsistency"}}},
{"19.12", {{"input_format_defaults_for_omitted_fields", false, true, "Enable calculation of complex default expressions for omitted fields for some input formats, because it should be the expected behaviour"}}},
{"19.5", {{"max_partitions_per_insert_block", 0, 100, "Add a limit for the number of partitions in one block"}}},
{"18.12.17", {{"enable_optimize_predicate_expression", 0, 1, "Optimize predicates to subqueries by default"}}},
};

View File

@ -20,10 +20,13 @@ void AddedColumns<false>::buildOutput() {}
template<>
void AddedColumns<false>::buildJoinGetOutput() {}
<<<<<<< HEAD
template<>
template<bool from_row_list>
void AddedColumns<false>::buildOutputFromBlocks() {}
=======
>>>>>>> add threshold for table rows
template<>
void AddedColumns<true>::buildOutput()
@ -32,9 +35,15 @@ void AddedColumns<true>::buildOutput()
buildOutputFromBlocks<false>();
else
{
<<<<<<< HEAD
if (join_data_avg_perkey_rows < output_by_row_list_threshold)
buildOutputFromBlocks<true>();
else
=======
if (join_data_avg_perkey_rows < sort_right_perkey_rows_threshold)
buildOutputFromBlocks<true>();
else if (join_data_sorted)
>>>>>>> add threshold for table rows
{
for (size_t i = 0; i < this->size(); ++i)
{
@ -44,14 +53,19 @@ void AddedColumns<true>::buildOutput()
if (row_ref_i)
{
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
<<<<<<< HEAD
for (auto it = row_ref_list->begin(); it.ok(); ++it)
col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num);
=======
col->insertRangeFrom(*row_ref_list->block->getByPosition(right_indexes[i]).column, row_ref_list->row_num, row_ref_list->rows);
>>>>>>> add threshold for table rows
}
else
type_name[i].type->insertDefaultInto(*col);
}
}
}
<<<<<<< HEAD
}
}
@ -74,6 +88,25 @@ void AddedColumns<true>::buildJoinGetOutput()
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
else
col->insertFrom(*column_from_block.column, row_ref->row_num);
=======
else
{
for (size_t i = 0; i < this->size(); ++i)
{
auto & col = columns[i];
for (auto row_ref_i : lazy_output.row_refs)
{
if (row_ref_i)
{
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
for (auto it = row_ref_list->begin(); it.ok(); ++it)
col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num);
}
else
type_name[i].type->insertDefaultInto(*col);
}
}
>>>>>>> add threshold for table rows
}
}
}
@ -82,7 +115,11 @@ template<>
template<bool from_row_list>
void AddedColumns<true>::buildOutputFromBlocks()
{
<<<<<<< HEAD
if (this->size() == 0)
=======
if (this->size() == 0)
>>>>>>> add threshold for table rows
return;
std::vector<const Block *> blocks;
std::vector<UInt32> row_nums;
@ -123,6 +160,32 @@ void AddedColumns<true>::buildOutputFromBlocks()
col->insertFrom(*blocks[j]->getByPosition(right_indexes[i]).column, row_nums[j]);
else
type_name[i].type->insertDefaultInto(*col);
<<<<<<< HEAD
=======
}
}
}
template<>
void AddedColumns<true>::buildJoinGetOutput()
{
for (size_t i = 0; i < this->size(); ++i)
{
auto & col = columns[i];
for (auto row_ref_i : lazy_output.row_refs)
{
if (!row_ref_i)
{
type_name[i].type->insertDefaultInto(*col);
continue;
}
const auto * row_ref = reinterpret_cast<const RowRef *>(row_ref_i);
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[i]);
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get()); nullable_col && !column_from_block.column->isNullable())
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
else
col->insertFrom(*column_from_block.column, row_ref->row_num);
>>>>>>> add threshold for table rows
}
}
}

View File

@ -196,6 +196,12 @@ private:
}
}
/** Build output from the blocks that extract from `RowRef` or `RowRefList`, to avoid block cache miss which may cause performance slow down.
* And This problem would happen it we directly build output from `RowRef` or `RowRefList`.
*/
template<bool from_row_list>
void buildOutputFromBlocks();
MutableColumns columns;
bool is_join_get;
std::vector<size_t> right_indexes;

View File

@ -649,7 +649,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
}
data->keys_to_join = total_rows;
shrinkStoredBlocksToFit(total_bytes);
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
@ -1361,4 +1360,87 @@ bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table
return false;
}
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
{
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
if constexpr (join_features.is_all_join && (join_features.left || join_features.inner))
{
auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref)
{
auto it = rows_ref.begin();
if (it.ok())
{
if (blocks.empty() || blocks.back().rows() > DEFAULT_BLOCK_SIZE)
blocks.emplace_back(it->block->cloneEmpty());
}
else
{
return;
}
auto & block = blocks.back();
size_t start_row = block.rows();
for (; it.ok(); ++it)
{
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = *(block.getByPosition(i).column->assumeMutable());
col.insertFrom(*it->block->getByPosition(i).column, it->row_num);
}
}
if (block.rows() > start_row)
{
RowRefList new_rows_ref(&block, start_row, block.rows() - start_row);
rows_ref = std::move(new_rows_ref);
}
};
auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map)
{
switch (data->type)
{
#define M(TYPE) \
case Type::TYPE: \
{\
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
break;
}
};
BlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
data->blocks.swap(sorted_blocks);
}
}
void HashJoin::tryRerangeRightTableData()
{
if ((kind != JoinKind::Inner && kind != JoinKind::Left) || strictness != JoinStrictness::All || table_join->getMixedJoinExpression())
return;
if (!data || data->sorted || data->blocks.empty() || data->maps.size() > 1)
return;
if (data->keys_to_join == 0)
data->keys_to_join = getTotalRowCount();
if (sample_block_with_columns_to_add.columns() == 0 || data->rows_to_join > table_join->sortRightTableRowsThreshold() || data->avgPerKeyRows() < table_join->sortRightPerkeyRowsThreshold())
{
LOG_DEBUG(log, "The joined right table total rows :{}, total keys :{}, columns added:{}",
data->rows_to_join, data->keys_to_join, sample_block_with_columns_to_add.columns());
return;
}
std::cout << "sort right table rows" << std::endl;
joinDispatch(
kind,
strictness,
data->maps.front(),
[&](auto kind_, auto strictness_, auto & map_) { tryRerangeRightTableDataImpl<kind_, decltype(map_), strictness_>(map_); });
std::cout << "sort right finished" << std::endl;
data->sorted = true;
}
}

View File

@ -345,11 +345,12 @@ public:
size_t blocks_allocated_size = 0;
size_t blocks_nullmaps_allocated_size = 0;
/// Number of rows of right table to join
size_t rows_to_join = 0;
/// Number of keys of right table to join
size_t keys_to_join = 0;
/// Whether the right table reranged by key
bool sorted = false;
size_t avgPerKeyRows() const
{
@ -465,6 +466,10 @@ private:
void validateAdditionalFilterExpression(std::shared_ptr<ExpressionActions> additional_filter_expression);
bool needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table_join_) const;
void tryRerangeRightTableData() override;
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void tryRerangeRightTableDataImpl(Map & map);
};
}

View File

@ -121,7 +121,142 @@ private:
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags);
JoinStuff::JoinUsedFlags & used_flags)
{
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
size_t rows = added_columns.rows_to_add;
if constexpr (need_filter)
added_columns.filter = IColumn::Filter(rows, 0);
if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right)))
added_columns.output_by_row_list = true;
Arena pool;
if constexpr (join_features.need_replication)
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
size_t i = 0;
for (; i < rows; ++i)
{
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset >= max_joined_block_rows))
{
added_columns.offsets_to_replicate->resize_assume_reserved(i);
added_columns.filter.resize_assume_reserved(i);
break;
}
}
bool right_row_found = false;
KnownRowsHolder<flag_per_row> known_rows;
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
{
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
if (join_keys.null_map && (*join_keys.null_map)[i])
continue;
bool row_acceptable = !join_keys.isRowFiltered(i);
using FindResult = typename KeyGetter::FindResult;
auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult();
if (find_result.isFound())
{
right_row_found = true;
auto & mapped = find_result.getMapped();
if constexpr (join_features.is_asof_join)
{
const IColumn & left_asof_key = added_columns.leftAsofKey();
auto row_ref = mapped->findAsof(left_asof_key, i);
if (row_ref && row_ref->block)
{
setUsed<need_filter>(added_columns.filter, i);
if constexpr (flag_per_row)
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref->block, row_ref->row_num, 0);
else
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
added_columns.appendFromBlock(row_ref, join_features.add_missing);
}
else
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
}
else if constexpr (join_features.is_all_join)
{
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right)
{
/// Use first appeared left key + it needs left columns replication
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
if (used_once)
{
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
setUsed<need_filter>(added_columns.filter, i);
addFoundRowAll<Map, join_features.add_missing>(
mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
}
else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner)
{
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
/// Use first appeared left key only
if (used_once)
{
setUsed<need_filter>(added_columns.filter, i);
added_columns.appendFromBlock(&mapped, join_features.add_missing);
}
break;
}
else if constexpr (join_features.is_any_join && join_features.full)
{
/// TODO
}
else if constexpr (join_features.is_anti_join)
{
if constexpr (join_features.right && join_features.need_flags)
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
}
else /// ANY LEFT, SEMI LEFT, old ANY (RightAny)
{
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
added_columns.appendFromBlock(&mapped, join_features.add_missing);
if (join_features.is_any_or_semi_join)
{
break;
}
}
}
}
if (!right_row_found)
{
if constexpr (join_features.is_anti_join && join_features.left)
setUsed<need_filter>(added_columns.filter, i);
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
}
if constexpr (join_features.need_replication)
{
(*added_columns.offsets_to_replicate)[i] = current_offset;
}
}
added_columns.applyLazyDefaults();
return i;
}
template <bool need_filter>
static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]);
@ -131,7 +266,120 @@ private:
size_t left_start_row,
const std::vector<const RowRef *> & selected_rows,
const std::vector<size_t> & row_replicate_offset,
AddedColumns & added_columns);
AddedColumns & added_columns)
{
ColumnPtr result_column;
do
{
if (selected_rows.empty())
{
result_column = ColumnUInt8::create();
break;
}
const Block & sample_right_block = *((*selected_rows.begin())->block);
if (!sample_right_block || !added_columns.additional_filter_expression)
{
auto filter = ColumnUInt8::create();
filter->insertMany(1, selected_rows.size());
result_column = std::move(filter);
break;
}
auto required_cols = added_columns.additional_filter_expression->getRequiredColumnsWithTypes();
if (required_cols.empty())
{
Block block;
added_columns.additional_filter_expression->execute(block);
result_column = block.getByPosition(0).column->cloneResized(selected_rows.size());
break;
}
NameSet required_column_names;
for (auto & col : required_cols)
required_column_names.insert(col.name);
Block executed_block;
size_t right_col_pos = 0;
for (const auto & col : sample_right_block.getColumnsWithTypeAndName())
{
if (required_column_names.contains(col.name))
{
auto new_col = col.column->cloneEmpty();
for (const auto & selected_row : selected_rows)
{
const auto & src_col = selected_row->block->getByPosition(right_col_pos);
new_col->insertFrom(*src_col.column, selected_row->row_num);
}
executed_block.insert({std::move(new_col), col.type, col.name});
}
right_col_pos += 1;
}
if (!executed_block)
{
result_column = ColumnUInt8::create();
break;
}
for (const auto & col_name : required_column_names)
{
const auto * src_col = added_columns.left_block.findByName(col_name);
if (!src_col)
continue;
auto new_col = src_col->column->cloneEmpty();
size_t prev_left_offset = 0;
for (size_t i = 1; i < row_replicate_offset.size(); ++i)
{
const size_t & left_offset = row_replicate_offset[i];
size_t rows = left_offset - prev_left_offset;
if (rows)
new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows);
prev_left_offset = left_offset;
}
executed_block.insert({std::move(new_col), src_col->type, col_name});
}
if (!executed_block)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"required columns: [{}], but not found any in left/right table. right table: {}, left table: {}",
required_cols.toString(),
sample_right_block.dumpNames(),
added_columns.left_block.dumpNames());
}
for (const auto & col : executed_block.getColumnsWithTypeAndName())
if (!col.column || !col.type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal nullptr column in input block: {}", executed_block.dumpStructure());
added_columns.additional_filter_expression->execute(executed_block);
result_column = executed_block.getByPosition(0).column->convertToFullColumnIfConst();
executed_block.clear();
} while (false);
result_column = result_column->convertToFullIfNeeded();
if (result_column->isNullable())
{
/// Convert Nullable(UInt8) to UInt8 ensuring that nulls are zeros
/// Trying to avoid copying data, since we are the only owner of the column.
ColumnPtr mask_column = assert_cast<const ColumnNullable &>(*result_column).getNullMapColumnPtr();
MutableColumnPtr mutable_column;
{
ColumnPtr nested_column = assert_cast<const ColumnNullable &>(*result_column).getNestedColumnPtr();
result_column.reset();
mutable_column = IColumn::mutate(std::move(nested_column));
}
auto & column_data = assert_cast<ColumnUInt8 &>(*mutable_column).getData();
const auto & mask_column_data = assert_cast<const ColumnUInt8 &>(*mask_column).getData();
for (size_t i = 0; i < column_data.size(); ++i)
{
if (mask_column_data[i])
column_data[i] = 0;
}
return mutable_column;
}
return result_column;
}
/// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression.
template <typename KeyGetter, typename Map, typename AddedColumns>
@ -141,7 +389,177 @@ private:
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]],
bool need_filter [[maybe_unused]],
bool flag_per_row [[maybe_unused]]);
bool need_flags [[maybe_unused]],
bool add_missing [[maybe_unused]],
bool flag_per_row [[maybe_unused]])
{
size_t left_block_rows = added_columns.rows_to_add;
if (need_filter)
added_columns.filter = IColumn::Filter(left_block_rows, 0);
std::unique_ptr<Arena> pool;
if constexpr (need_replication)
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(left_block_rows);
std::vector<size_t> row_replicate_offset;
row_replicate_offset.reserve(left_block_rows);
using FindResult = typename KeyGetter::FindResult;
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
size_t left_row_iter = 0;
PreSelectedRows selected_rows;
selected_rows.reserve(left_block_rows);
std::vector<FindResult> find_results;
find_results.reserve(left_block_rows);
bool exceeded_max_block_rows = false;
IColumn::Offset total_added_rows = 0;
IColumn::Offset current_added_rows = 0;
auto collect_keys_matched_rows_refs = [&]()
{
pool = std::make_unique<Arena>();
find_results.clear();
row_replicate_offset.clear();
row_replicate_offset.push_back(0);
current_added_rows = 0;
selected_rows.clear();
for (; left_row_iter < left_block_rows; ++left_row_iter)
{
if constexpr (need_replication)
{
if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows))
{
break;
}
}
KnownRowsHolder<true> all_flag_known_rows;
KnownRowsHolder<false> single_flag_know_rows;
for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx)
{
const auto & join_keys = added_columns.join_on_keys[join_clause_idx];
if (join_keys.null_map && (*join_keys.null_map)[left_row_iter])
continue;
bool row_acceptable = !join_keys.isRowFiltered(left_row_iter);
auto find_result = row_acceptable
? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool)
: FindResult();
if (find_result.isFound())
{
auto & mapped = find_result.getMapped();
find_results.push_back(find_result);
if (flag_per_row)
addFoundRowAll<Map, false, true>(mapped, selected_rows, current_added_rows, all_flag_known_rows, nullptr);
else
addFoundRowAll<Map, false, false>(mapped, selected_rows, current_added_rows, single_flag_know_rows, nullptr);
}
}
row_replicate_offset.push_back(current_added_rows);
}
};
auto copy_final_matched_rows = [&](size_t left_start_row, ColumnPtr filter_col)
{
const PaddedPODArray<UInt8> & filter_flags = assert_cast<const ColumnUInt8 &>(*filter_col).getData();
size_t prev_replicated_row = 0;
auto selected_right_row_it = selected_rows.begin();
size_t find_result_index = 0;
for (size_t i = 1, n = row_replicate_offset.size(); i < n; ++i)
{
bool any_matched = false;
/// For all right join, flag_per_row is true, we need mark used flags for each row.
if (flag_per_row)
{
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
{
if (filter_flags[replicated_row])
{
any_matched = true;
added_columns.appendFromBlock(*selected_right_row_it, add_missing);
total_added_rows += 1;
if (need_flags)
used_flags.template setUsed<true, true>((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
}
++selected_right_row_it;
}
}
else
{
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
{
if (filter_flags[replicated_row])
{
any_matched = true;
added_columns.appendFromBlock(*selected_right_row_it, add_missing);
total_added_rows += 1;
}
++selected_right_row_it;
}
}
if (!any_matched)
{
if (add_missing)
addNotFoundRow<true, need_replication>(added_columns, total_added_rows);
else
addNotFoundRow<false, need_replication>(added_columns, total_added_rows);
}
else
{
if (!flag_per_row && need_flags)
used_flags.template setUsed<true, false>(find_results[find_result_index]);
if (need_filter)
setUsed<true>(added_columns.filter, left_start_row + i - 1);
if (add_missing)
added_columns.applyLazyDefaults();
}
find_result_index += (prev_replicated_row != row_replicate_offset[i]);
if constexpr (need_replication)
{
(*added_columns.offsets_to_replicate)[left_start_row + i - 1] = total_added_rows;
}
prev_replicated_row = row_replicate_offset[i];
}
};
while (left_row_iter < left_block_rows && !exceeded_max_block_rows)
{
auto left_start_row = left_row_iter;
collect_keys_matched_rows_refs();
if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sizes are mismatched. selected_rows.size:{}, current_added_rows:{}, row_replicate_offset.size:{}, left_row_iter: {}, "
"left_start_row: {}",
selected_rows.size(),
current_added_rows,
row_replicate_offset.size(),
left_row_iter,
left_start_row);
}
auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns);
copy_final_matched_rows(left_start_row, filter_col);
if constexpr (need_replication)
{
// Add a check for current_added_rows to avoid run the filter expression on too small size batch.
if (total_added_rows >= max_joined_block_rows || current_added_rows < 1024)
exceeded_max_block_rows = true;
}
}
if constexpr (need_replication)
{
added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter);
added_columns.filter.resize_assume_reserved(left_row_iter);
}
added_columns.applyLazyDefaults();
return left_row_iter;
}
/// Cut first num_rows rows from block in place and returns block with remaining rows
static Block sliceBlock(Block & block, size_t num_rows);

View File

@ -115,6 +115,7 @@ public:
/// Peek next stream of delayed joined blocks.
virtual IBlocksStreamPtr getDelayedBlocks() { return nullptr; }
virtual bool hasDelayedBlocks() const { return false; }
virtual void tryRerangeRightTableData() {}
virtual IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0;

View File

@ -123,6 +123,7 @@ struct RowRefList : RowRef
RowRefList() {} /// NOLINT
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {}
RowRefList(const Block * block_, size_t row_start_, size_t rows_) : RowRef(block_, row_start_), rows(static_cast<SizeT>(rows_)) {}
ForwardIterator begin() const { return ForwardIterator(this); }

View File

@ -299,13 +299,17 @@ IProcessor::Status FillingRightJoinSideTransform::prepare()
void FillingRightJoinSideTransform::work()
{
auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
auto & input = inputs.front();
auto block = input.getHeader().cloneWithColumns(chunk.detachColumns());
if (for_totals)
join->setTotals(block);
else
stop_reading = !join->addBlockToJoin(block);
if (input.isFinished())
join->tryRerangeRightTableData();
set_totals = for_totals;
}