change as request

This commit is contained in:
taiyang-li 2023-08-21 12:09:02 +08:00
parent 9866d2727e
commit f723e8d43a
7 changed files with 86 additions and 105 deletions

View File

@ -871,6 +871,7 @@ class IColumn;
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \
M(Char, input_format_hive_text_fields_delimiter, '\x01', "Delimiter between fields in Hive Text File", 0) \

View File

@ -185,6 +185,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching;
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
format_settings.orc.use_fast_decoder = settings.input_format_orc_use_fast_decoder;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;

View File

@ -343,6 +343,7 @@ struct FormatSettings
std::unordered_set<int> skip_stripes = {};
bool output_string_as_string = false;
ORCCompression output_compression_method = ORCCompression::NONE;
bool use_fast_decoder = true;
} orc;
/// For capnProto format we should determine how to

View File

@ -1,5 +1,4 @@
#include "NativeORCBlockInputFormat.h"
#include "ArrowBufferedStreams.h"
#if USE_ORC
# include <Columns/ColumnDecimal.h>
@ -30,6 +29,7 @@
# include <IO/copyData.h>
# include <Interpreters/castColumn.h>
# include <boost/algorithm/string/case_conv.hpp>
# include "ArrowBufferedStreams.h"
namespace DB
@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
extern const int THERE_IS_NO_COLUMN;
extern const int INCORRECT_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
}
ORCInputStream::ORCInputStream(SeekableReadBuffer & in_, size_t file_size_) : in(in_), file_size(file_size_)
@ -74,7 +75,7 @@ std::unique_ptr<orc::InputStream> asORCInputStream(ReadBuffer & in, const Format
if (has_file_size && seekable_in && settings.seekable_read && seekable_in->checkIfActuallySeekable())
return std::make_unique<ORCInputStream>(*seekable_in, getFileSizeFromReadBuffer(in));
// fallback to loading the entire file in memory
/// Fallback to loading the entire file in memory
return asORCInputStreamLoadIntoMemory(in, is_cancelled);
}
@ -196,12 +197,12 @@ static void getFileReaderAndSchema(
}
}
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
NativeORCBlockInputFormat::NativeORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), &in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
{
}
void ORCBlockInputFormat::prepareFileReader()
void NativeORCBlockInputFormat::prepareFileReader()
{
Block schema;
getFileReaderAndSchema(*in, file_reader, schema, format_settings, is_stopped);
@ -228,7 +229,7 @@ void ORCBlockInputFormat::prepareFileReader()
}
}
bool ORCBlockInputFormat::prepareStripeReader()
bool NativeORCBlockInputFormat::prepareStripeReader()
{
assert(file_reader);
@ -255,7 +256,7 @@ bool ORCBlockInputFormat::prepareStripeReader()
return true;
}
Chunk ORCBlockInputFormat::generate()
Chunk NativeORCBlockInputFormat::generate()
{
block_missing_values.clear();
@ -291,7 +292,7 @@ Chunk ORCBlockInputFormat::generate()
return res;
}
void ORCBlockInputFormat::resetParser()
void NativeORCBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
@ -302,17 +303,17 @@ void ORCBlockInputFormat::resetParser()
block_missing_values.clear();
}
const BlockMissingValues & ORCBlockInputFormat::getMissingValues() const
const BlockMissingValues & NativeORCBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
ORCSchemaReader::ORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
NativeORCSchemaReader::NativeORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
{
}
NamesAndTypesList ORCSchemaReader::readSchema()
NamesAndTypesList NativeORCSchemaReader::readSchema()
{
Block header;
std::unique_ptr<orc::Reader> file_reader;
@ -493,7 +494,7 @@ readColumnWithFixedStringData(const orc::ColumnVectorBatch * orc_column, const o
if (orc_str_column->data[i])
column_chars_t.insert_assume_reserved(orc_str_column->data[i], orc_str_column->data[i] + orc_str_column->length[i]);
else
column_chars_t.resize(column_chars_t.size() + fixed_len);
column_chars_t.resize_fill(column_chars_t.size() + fixed_len);
}
return {std::move(internal_column), std::move(internal_type), column_name};
@ -575,7 +576,7 @@ readIPv4ColumnWithInt32Data(const orc::ColumnVectorBatch * orc_column, const orc
return {std::move(internal_column), std::move(internal_type), column_name};
}
template <typename ColumnType, typename ValueType = typename ColumnType::ValueType>
template <typename ColumnType>
static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(
const orc::ColumnVectorBatch * orc_column, const orc::Type *, const String & column_name, const DataTypePtr & column_type)
{
@ -590,7 +591,17 @@ static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(
if (!orc_str_column->data[i]) [[unlikely]]
integer_column.insertDefault();
else
{
if (sizeof(typename ColumnType::ValueType) != orc_str_column->length[i])
throw Exception(
ErrorCodes::INCORRECT_DATA,
"ValueType size {} of column {} is not equal to size of binary data {}",
sizeof(typename ColumnType::ValueType),
integer_column.getName(),
orc_str_column->length[i]);
integer_column.insertData(orc_str_column->data[i], orc_str_column->length[i]);
}
}
return {std::move(internal_column), column_type, column_name};
}
@ -657,7 +668,6 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
const orc::Type * orc_type,
const std::string & column_name,
bool inside_nullable,
bool allow_null_type,
bool skip_columns_with_unsupported_types,
bool & skipped,
DataTypePtr type_hint = nullptr)
@ -670,7 +680,7 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
nested_type_hint = removeNullable(type_hint);
auto nested_column = readColumnFromORCColumn(
orc_column, orc_type, column_name, true, allow_null_type, skip_columns_with_unsupported_types, skipped, nested_type_hint);
orc_column, orc_type, column_name, true, skip_columns_with_unsupported_types, skipped, nested_type_hint);
if (skipped)
return {};
@ -753,41 +763,23 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
return readColumnWithTimestampData(orc_column, orc_type, column_name);
case orc::DECIMAL: {
auto interal_type = parseORCType(orc_type);
auto precision = orc_type->getPrecision();
if (precision == 0)
precision = 38;
if (precision > 18)
{
if (precision <= DecimalUtils::max_precision<Decimal32>)
return readColumnWithDecimalDataCast<Decimal32, orc::Decimal128VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal64>)
return readColumnWithDecimalDataCast<Decimal64, orc::Decimal128VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal128>)
return readColumnWithDecimalDataCast<Decimal128, orc::Decimal128VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else
return readColumnWithDecimalDataCast<Decimal256, orc::Decimal128VectorBatch>(
orc_column, orc_type, column_name, interal_type);
}
if (precision <= DecimalUtils::max_precision<Decimal32>)
return readColumnWithDecimalDataCast<Decimal32, orc::Decimal64VectorBatch>(orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal64>)
return readColumnWithDecimalDataCast<Decimal64, orc::Decimal64VectorBatch>(orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal128>)
return readColumnWithDecimalDataCast<Decimal128, orc::Decimal128VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else
{
if (precision <= DecimalUtils::max_precision<Decimal32>)
return readColumnWithDecimalDataCast<Decimal32, orc::Decimal64VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal64>)
return readColumnWithDecimalDataCast<Decimal64, orc::Decimal64VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else if (precision <= DecimalUtils::max_precision<Decimal128>)
return readColumnWithDecimalDataCast<Decimal128, orc::Decimal64VectorBatch>(
orc_column, orc_type, column_name, interal_type);
else
return readColumnWithDecimalDataCast<Decimal256, orc::Decimal64VectorBatch>(
orc_column, orc_type, column_name, interal_type);
}
throw Exception(
ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"Decimal precision {} in ORC type {} is out of bound",
precision,
orc_type->toString());
}
case orc::MAP: {
DataTypePtr key_type_hint;
@ -809,7 +801,7 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
const auto * orc_value_type = orc_type->getSubtype(1);
auto key_column = readColumnFromORCColumn(
orc_key_column, orc_key_type, "key", false, allow_null_type, skip_columns_with_unsupported_types, skipped, key_type_hint);
orc_key_column, orc_key_type, "key", false, skip_columns_with_unsupported_types, skipped, key_type_hint);
if (skipped)
return {};
@ -822,14 +814,7 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
}
auto value_column = readColumnFromORCColumn(
orc_value_column,
orc_value_type,
"value",
false,
allow_null_type,
skip_columns_with_unsupported_types,
skipped,
value_type_hint);
orc_value_column, orc_value_type, "value", false, skip_columns_with_unsupported_types, skipped, value_type_hint);
if (skipped)
return {};
@ -859,14 +844,7 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
const auto * orc_nested_column = getNestedORCColumn(orc_list_column);
const auto * orc_nested_type = orc_type->getSubtype(0);
auto nested_column = readColumnFromORCColumn(
orc_nested_column,
orc_nested_type,
column_name,
false,
allow_null_type,
skip_columns_with_unsupported_types,
skipped,
nested_type_hint);
orc_nested_column, orc_nested_type, column_name, false, skip_columns_with_unsupported_types, skipped, nested_type_hint);
if (skipped)
return {};
@ -903,14 +881,7 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
const auto * nested_orc_column = orc_struct_column->fields[i];
const auto * nested_orc_type = orc_type->getSubtype(i);
auto element = readColumnFromORCColumn(
nested_orc_column,
nested_orc_type,
field_name,
false,
allow_null_type,
skip_columns_with_unsupported_types,
skipped,
nested_type_hint);
nested_orc_column, nested_orc_type, field_name, false, skip_columns_with_unsupported_types, skipped, nested_type_hint);
if (skipped)
return {};
@ -986,7 +957,6 @@ void ORCColumnToCHColumn::orcColumnsToCHChunk(
orc_column_with_type.second,
nested_table_name,
false,
true,
false,
skipped,
nested_table_type)};
@ -1029,7 +999,6 @@ void ORCColumnToCHColumn::orcColumnsToCHChunk(
orc_column_with_type.second,
header_column.name,
false,
true,
false,
skipped,
header_column.type);
@ -1059,12 +1028,13 @@ void ORCColumnToCHColumn::orcColumnsToCHChunk(
res.setColumns(columns_list, num_rows);
}
/*
void registerInputFormatORC(FormatFactory & factory)
{
factory.registerInputFormat(
"ORC",
[](ReadBuffer & buf, const Block & sample, const RowInputFormatParams &, const FormatSettings & settings)
{ return std::make_shared<ORCBlockInputFormat>(buf, sample, settings); });
{ return std::make_shared<NativeORCBlockInputFormat>(buf, sample, settings); });
factory.markFormatSupportsSubsetOfColumns("ORC");
}
@ -1078,12 +1048,15 @@ void registerORCSchemaReader(FormatFactory & factory)
[](const FormatSettings & settings)
{ return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable); });
}
*/
}
#else
namespace DB
{
/*
class FormatFactory;
void registerInputFormatORC(FormatFactory &)
{
@ -1092,6 +1065,7 @@ void registerInputFormatORC(FormatFactory &)
void registerORCSchemaReader(FormatFactory &)
{
}
*/
}

View File

@ -1,9 +1,9 @@
#pragma once
#include "IO/ReadBufferFromString.h"
#include "config.h"
#if USE_ORC
# include <Formats/FormatSettings.h>
# include <IO/ReadBufferFromString.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/ISchemaReader.h>
# include <orc/OrcFile.hh>
@ -44,10 +44,10 @@ std::unique_ptr<orc::InputStream> asORCInputStreamLoadIntoMemory(ReadBuffer & in
class ORCColumnToCHColumn;
class ORCBlockInputFormat : public IInputFormat
class NativeORCBlockInputFormat : public IInputFormat
{
public:
ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
NativeORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
String getName() const override { return "ORCBlockInputFormat"; }
@ -87,10 +87,10 @@ private:
std::atomic<int> is_stopped{0};
};
class ORCSchemaReader : public ISchemaReader
class NativeORCSchemaReader : public ISchemaReader
{
public:
ORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
NativeORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
NamesAndTypesList readSchema() override;

View File

@ -1,16 +1,17 @@
#include "ORCBlockInputFormat.h"
#include <boost/algorithm/string/case_conv.hpp>
#if USE_ORC
#include <Formats/FormatFactory.h>
#include <Formats/SchemaInferenceUtils.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include "ArrowFieldIndexUtil.h"
#include <DataTypes/NestedUtils.h>
#if USE_ORC
# include <DataTypes/NestedUtils.h>
# include <Formats/FormatFactory.h>
# include <Formats/SchemaInferenceUtils.h>
# include <IO/ReadBufferFromMemory.h>
# include <IO/WriteHelpers.h>
# include <IO/copyData.h>
# include <boost/algorithm/string/case_conv.hpp>
# include "ArrowBufferedStreams.h"
# include "ArrowColumnToCHColumn.h"
# include "ArrowFieldIndexUtil.h"
# include "NativeORCBlockInputFormat.h"
namespace DB
{
@ -156,31 +157,34 @@ NamesAndTypesList ORCSchemaReader::readSchema()
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();}
void registerInputFormatDeprecatedORC(FormatFactory & factory)
void registerInputFormatORC(FormatFactory & factory)
{
factory.registerInputFormat(
"DeprecatedORC",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
"ORC",
[](ReadBuffer & buf, const Block & sample, const RowInputFormatParams &, const FormatSettings & settings)
{
if (settings.orc.use_fast_decoder)
return std::make_shared<NativeORCBlockInputFormat>(buf, sample, settings);
else
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
});
factory.markFormatSupportsSubsetOfColumns("DeprecatedORC");
});
factory.markFormatSupportsSubsetOfColumns("ORC");
}
void registerDeprecatedORCSchemaReader(FormatFactory & factory)
void registerORCSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"DeprecatedORC",
"ORC",
[](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_shared<ORCSchemaReader>(buf, settings);
if (settings.orc.use_fast_decoder)
return std::make_shared<NativeORCSchemaReader>(buf, settings);
else
return std::make_shared<ORCSchemaReader>(buf, settings);
}
);
factory.registerAdditionalInfoForSchemaCacheGetter("DeprecatedORC", [](const FormatSettings & settings)
factory.registerAdditionalInfoForSchemaCacheGetter("ORC", [](const FormatSettings & settings)
{
return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable);
});
@ -192,11 +196,11 @@ void registerDeprecatedORCSchemaReader(FormatFactory & factory)
namespace DB
{
class FormatFactory;
void registerInputFormatDeprecatedORC(FormatFactory &)
void registerInputFormatORC(FormatFactory &)
{
}
void registerDeprecatedORCSchemaReader(FormatFactory &)
void registerORCSchemaReader(FormatFactory &)
{
}
}

View File

@ -12,7 +12,7 @@ int main()
// String path = "/data1/clickhouse_official/data/user_files/bigolive_audience_stats_orc.orc";
{
ReadBufferFromFile in(path);
ORCSchemaReader schema_reader(in, {});
NativeORCSchemaReader schema_reader(in, {});
auto schema = schema_reader.readSchema();
std::cout << "schema:" << schema.toString() << std::endl;
}
@ -28,7 +28,7 @@ int main()
content.resize(out.count());
ReadBufferFromString in2(content);
ORCSchemaReader schema_reader(in2, {});
NativeORCSchemaReader schema_reader(in2, {});
auto schema = schema_reader.readSchema();
std::cout << "schema:" << schema.toString() << std::endl;
}