Merge branch 'master' into variant_inference

This commit is contained in:
Shaun Struwig 2024-06-11 11:59:59 +02:00 committed by GitHub
commit dd8e434b51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 137 additions and 21 deletions

View File

@ -2165,6 +2165,8 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `lz4`.
- [input_format_parquet_max_block_size](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Max block row size for parquet reader. Default value - `65409`.
- [input_format_parquet_prefer_block_bytes](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_prefer_block_bytes) - Average block bytes output by parquet reader. Default value - `16744704`.
## ParquetMetadata {data-format-parquet-metadata}

View File

@ -1417,6 +1417,17 @@ Compression method used in output Parquet format. Supported codecs: `snappy`, `l
Default value: `lz4`.
### input_format_parquet_max_block_size {#input_format_parquet_max_block_size}
Max block row size for parquet reader. By controlling the number of rows in each block, you can control the memory usage,
and in some operators that cache blocks, you can improve the accuracy of the operator's memory control。
Default value: `65409`.
### input_format_parquet_prefer_block_bytes {#input_format_parquet_prefer_block_bytes}
Average block bytes output by parquet reader. Lowering the configuration in the case of reading some high compression parquet relieves the memory pressure.
Default value: `65409 * 256 = 16744704`
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}

View File

@ -1,5 +1,7 @@
#include <Analyzer/FunctionNode.h>
#include <Columns/ColumnConst.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorToString.h>
@ -58,12 +60,20 @@ ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
ColumnWithTypeAndName argument_column;
auto * constant = argument->as<ConstantNode>();
if (isNameOfInFunction(function_name) && i == 1)
{
argument_column.type = std::make_shared<DataTypeSet>();
if (constant)
{
/// Created but not filled for the analysis during function resolution.
FutureSetPtr empty_set;
argument_column.column = ColumnConst::create(ColumnSet::create(1, empty_set), 1);
}
}
else
argument_column.type = argument->getResultType();
auto * constant = argument->as<ConstantNode>();
if (constant && !isNotCreatable(argument_column.type))
argument_column.column = argument_column.type->createColumnConst(1, constant->getValue());

View File

@ -551,14 +551,25 @@ private:
in_function->getArguments().getNodes() = std::move(in_arguments);
in_function->resolveAsFunction(in_function_resolver);
DataTypePtr result_type = in_function->getResultType();
const auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(result_type.get());
if (type_low_cardinality)
result_type = type_low_cardinality->getDictionaryType();
/** For `k :: UInt8`, expression `k = 1 OR k = NULL` with result type Nullable(UInt8)
* is replaced with `k IN (1, NULL)` with result type UInt8.
* Convert it back to Nullable(UInt8).
* And for `k :: LowCardinality(UInt8)`, the transformation of `k IN (1, NULL)` results in type LowCardinality(UInt8).
* Convert it to LowCardinality(Nullable(UInt8)).
*/
if (is_any_nullable && !in_function->getResultType()->isNullable())
if (is_any_nullable && !result_type->isNullable())
{
auto nullable_result_type = std::make_shared<DataTypeNullable>(in_function->getResultType());
auto in_function_nullable = createCastFunction(std::move(in_function), std::move(nullable_result_type), getContext());
DataTypePtr new_result_type = std::make_shared<DataTypeNullable>(result_type);
if (type_low_cardinality)
{
new_result_type = std::make_shared<DataTypeLowCardinality>(new_result_type);
}
auto in_function_nullable = createCastFunction(std::move(in_function), std::move(new_result_type), getContext());
or_operands.push_back(std::move(in_function_nullable));
}
else

View File

@ -1060,7 +1060,8 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -97,6 +97,8 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_try_infer_variants", 0, 0, "Try to infer Variant type in text formats when there is more than one possible type for column/array elements"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Increase block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
{"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."},
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},

View File

@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;

View File

@ -266,7 +266,8 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
UInt64 max_block_size = 8192;
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;

View File

@ -111,7 +111,7 @@ public:
argument_types.push_back(argument.type);
/// More efficient specialization for two numeric arguments.
if (arguments.size() == 2 && isNumber(arguments[0].type) && isNumber(arguments[1].type))
if (arguments.size() == 2 && isNumber(removeNullable(arguments[0].type)) && isNumber(removeNullable(arguments[1].type)))
return std::make_unique<FunctionToFunctionBaseAdaptor>(SpecializedFunction::create(context), argument_types, return_type);
return std::make_unique<FunctionToFunctionBaseAdaptor>(
@ -123,7 +123,7 @@ public:
if (types.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} cannot be called without arguments", getName());
if (types.size() == 2 && isNumber(types[0]) && isNumber(types[1]))
if (types.size() == 2 && isNumber(removeNullable(types[0])) && isNumber(removeNullable(types[1])))
return SpecializedFunction::create(context)->getReturnTypeImpl(types);
return getLeastSupertype(types);

View File

@ -420,6 +420,24 @@ void ParquetBlockInputFormat::initializeIfNeeded()
int num_row_groups = metadata->num_row_groups();
row_group_batches.reserve(num_row_groups);
auto adative_chunk_size = [&](int row_group_idx) -> size_t
{
size_t total_size = 0;
auto row_group_meta = metadata->RowGroup(row_group_idx);
for (int column_index : column_indices)
{
total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
}
if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
auto average_row_bytes = floor(static_cast<double>(total_size) / row_group_meta->num_rows());
// avoid inf preferred_num_rows;
if (average_row_bytes < 1) return 0;
const size_t preferred_num_rows = static_cast<size_t>(floor(format_settings.parquet.prefer_block_bytes/average_row_bytes));
const size_t MIN_ROW_NUM = 128;
// size_t != UInt64 in darwin
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
};
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
@ -439,6 +457,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adative_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
}
}
@ -449,7 +469,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
parquet::ArrowReaderProperties arrow_properties;
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
arrow_properties.set_use_threads(false);
arrow_properties.set_batch_size(format_settings.parquet.max_block_size);
arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one

View File

@ -208,6 +208,8 @@ private:
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
size_t adaptive_chunk_size = 0;
std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.

View File

@ -0,0 +1,10 @@
<test>
<create_query>CREATE TABLE test (id Int32, x1 Nullable(Int32), x2 Nullable(Float32)) ENGINE = MergeTree() ORDER BY id</create_query>
<fill_query>INSERT INTO test SELECT number, number+1, number + 2 FROM numbers(1000000)</fill_query>
<query tag='LEAST'>SELECT COUNT(1) FROM test WHERE least(x1, x2) > 1</query>
<query tag='GREATEST'>SELECT COUNT(1) FROM test WHERE GREATEST(x1, x2) > 1</query>
<drop_query>DROP TABLE IF EXISTS test</drop_query>
</test>

View File

@ -10,11 +10,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: in, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: or, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: in, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -28,11 +32,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: in, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: or, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: in, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -46,11 +54,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: notIn, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: notIn, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: \'UInt8\', constant_value_type: String
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -64,11 +76,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: notIn, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: notIn, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: \'UInt8\', constant_value_type: String
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality

View File

@ -0,0 +1,4 @@
65409
16
128
2363

View File

@ -0,0 +1,25 @@
-- Tags: no-fasttest, no-parallel, no-random-settings
set max_insert_threads=1;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet);
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_size=16;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30720;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;