Allow control compression in Parquet/ORC/Arrow output formats, support more compression for input formats

This commit is contained in:
avogar 2023-03-01 21:27:46 +00:00
parent 5b42383730
commit 5ab5902f38
10 changed files with 171 additions and 10 deletions

View File

@ -115,6 +115,13 @@ configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/A
# ARROW_ORC + adapters/orc/CMakefiles
set(ORC_SRCS
"${CMAKE_CURRENT_BINARY_DIR}/orc_proto.pb.h"
"${ORC_SOURCE_SRC_DIR}/sargs/ExpressionTree.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/Literal.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/PredicateLeaf.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/SargsApplier.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/SearchArgument.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/TruthValue.cc"
"${ORC_SOURCE_SRC_DIR}/Exceptions.cc"
"${ORC_SOURCE_SRC_DIR}/OrcFile.cc"
"${ORC_SOURCE_SRC_DIR}/Reader.cc"
@ -129,13 +136,20 @@ set(ORC_SRCS
"${ORC_SOURCE_SRC_DIR}/MemoryPool.cc"
"${ORC_SOURCE_SRC_DIR}/RLE.cc"
"${ORC_SOURCE_SRC_DIR}/RLEv1.cc"
"${ORC_SOURCE_SRC_DIR}/RLEv2.cc"
"${ORC_SOURCE_SRC_DIR}/RleDecoderV2.cc"
"${ORC_SOURCE_SRC_DIR}/RleEncoderV2.cc"
"${ORC_SOURCE_SRC_DIR}/RLEV2Util.cc"
"${ORC_SOURCE_SRC_DIR}/Statistics.cc"
"${ORC_SOURCE_SRC_DIR}/StripeStream.cc"
"${ORC_SOURCE_SRC_DIR}/Timezone.cc"
"${ORC_SOURCE_SRC_DIR}/TypeImpl.cc"
"${ORC_SOURCE_SRC_DIR}/Vector.cc"
"${ORC_SOURCE_SRC_DIR}/Writer.cc"
"${ORC_SOURCE_SRC_DIR}/Adaptor.cc"
"${ORC_SOURCE_SRC_DIR}/BloomFilter.cc"
"${ORC_SOURCE_SRC_DIR}/Murmur3.cc"
"${ORC_SOURCE_SRC_DIR}/BlockBuffer.cc"
"${ORC_SOURCE_SRC_DIR}/wrap/orc-proto-wrapper.cc"
"${ORC_SOURCE_SRC_DIR}/io/InputStream.cc"
"${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc"
"${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc"
@ -358,6 +372,9 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS})
add_definitions(-DARROW_WITH_ZSTD)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS})
add_definitions(-DARROW_WITH_BROTLI)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS})
add_library(_arrow ${ARROW_SRCS})
@ -372,6 +389,7 @@ target_link_libraries(_arrow PRIVATE
ch_contrib::snappy
ch_contrib::zlib
ch_contrib::zstd
ch_contrib::brotli
)
target_link_libraries(_arrow PUBLIC _orc)

2
contrib/orc vendored

@ -1 +1 @@
Subproject commit f9a393ed2433a60034795284f82d093b348f2102
Subproject commit c5d7755ba0b9a95631c8daea4d094101f26ec761

View File

@ -857,6 +857,7 @@ class IColumn;
M(Bool, output_format_parquet_string_as_string, false, "Use Parquet String type instead of Binary for String columns.", 0) \
M(Bool, output_format_parquet_fixed_string_as_fixed_byte_array, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary for FixedString columns.", 0) \
M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \
M(ParquetCompression, output_format_parquet_compression_method, "snappy", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
@ -899,8 +900,10 @@ class IColumn;
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \
M(Bool, output_format_arrow_fixed_string_as_fixed_byte_array, true, "Use Arrow FIXED_SIZE_BINARY type instead of Binary for FixedString columns.", 0) \
M(ArrowCompression, output_format_arrow_compression_method, "none", "Compression method for Arrow output format. Supported codecs: lz4_frame, zstd, none (uncompressed)", 0) \
\
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
M(ORCCompression, output_format_orc_compression_method, "none", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\

View File

@ -158,7 +158,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{"XML", FormatSettings::EscapingRule::XML},
{"Raw", FormatSettings::EscapingRule::Raw}})
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation , ErrorCodes::BAD_ARGUMENTS,
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation, ErrorCodes::BAD_ARGUMENTS,
{{"bin", FormatSettings::MsgPackUUIDRepresentation::BIN},
{"str", FormatSettings::MsgPackUUIDRepresentation::STR},
{"ext", FormatSettings::MsgPackUUIDRepresentation::EXT}})
@ -172,11 +172,30 @@ IMPLEMENT_SETTING_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS,
{"pread", LocalFSReadMethod::pread},
{"read", LocalFSReadMethod::read}})
IMPLEMENT_SETTING_ENUM_WITH_RENAME(ParquetVersion, ErrorCodes::BAD_ARGUMENTS,
{{"1.0", FormatSettings::ParquetVersion::V1_0},
{"2.4", FormatSettings::ParquetVersion::V2_4},
{"2.6", FormatSettings::ParquetVersion::V2_6},
{"2.latest", FormatSettings::ParquetVersion::V2_LATEST}})
IMPLEMENT_SETTING_ENUM(ParquetCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ParquetCompression::NONE},
{"snappy", FormatSettings::ParquetCompression::SNAPPY},
{"zstd", FormatSettings::ParquetCompression::ZSTD},
{"gzip", FormatSettings::ParquetCompression::GZIP},
{"lz4", FormatSettings::ParquetCompression::LZ4},
{"brotli", FormatSettings::ParquetCompression::BROTLI}})
IMPLEMENT_SETTING_ENUM(ArrowCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ArrowCompression::NONE},
{"lz4_frame", FormatSettings::ArrowCompression::LZ4_FRAME},
{"zstd", FormatSettings::ArrowCompression::ZSTD}})
IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ORCCompression::NONE},
{"snappy", FormatSettings::ORCCompression::SNAPPY},
{"zstd", FormatSettings::ORCCompression::ZSTD},
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
}

View File

@ -194,6 +194,12 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation)
DECLARE_SETTING_ENUM_WITH_RENAME(ParquetCompression, FormatSettings::ParquetCompression)
DECLARE_SETTING_ENUM_WITH_RENAME(ArrowCompression, FormatSettings::ArrowCompression)
DECLARE_SETTING_ENUM_WITH_RENAME(ORCCompression, FormatSettings::ORCCompression)
enum class Dialect
{
clickhouse,

View File

@ -118,6 +118,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
@ -158,6 +159,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.case_insensitive_column_matching = settings.input_format_arrow_case_insensitive_column_matching;
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
@ -168,6 +170,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching;
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;

View File

@ -86,6 +86,13 @@ struct FormatSettings
UInt64 max_parser_depth = DBMS_DEFAULT_MAX_PARSER_DEPTH;
enum class ArrowCompression
{
NONE,
LZ4_FRAME,
ZSTD
};
struct
{
UInt64 row_group_size = 1000000;
@ -96,6 +103,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
ArrowCompression output_compression_method = ArrowCompression::NONE;
} arrow;
struct
@ -183,6 +191,16 @@ struct FormatSettings
V2_LATEST,
};
enum class ParquetCompression
{
NONE,
SNAPPY,
ZSTD,
LZ4,
GZIP,
BROTLI,
};
struct
{
UInt64 row_group_size = 1000000;
@ -195,6 +213,7 @@ struct FormatSettings
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;
struct Pretty
@ -276,6 +295,15 @@ struct FormatSettings
bool accurate_types_of_literals = true;
} values;
enum class ORCCompression
{
NONE,
LZ4,
SNAPPY,
ZSTD,
ZLIB,
};
struct
{
bool import_nested = false;
@ -285,6 +313,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_stripes = {};
bool output_string_as_string = false;
ORCCompression output_compression_method = ORCCompression::NONE;
} orc;
/// For capnProto format we should determine how to

View File

@ -8,6 +8,7 @@
#include <arrow/result.h>
#include "ArrowBufferedStreams.h"
#include "CHColumnToArrowColumn.h"
#include "config.h"
namespace DB
@ -17,6 +18,25 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
namespace
{
arrow::Compression::type getArrowCompression(FormatSettings::ArrowCompression method)
{
switch (method)
{
case FormatSettings::ArrowCompression::NONE:
return arrow::Compression::type::UNCOMPRESSED;
case FormatSettings::ArrowCompression::ZSTD:
return arrow::Compression::type::ZSTD;
case FormatSettings::ArrowCompression::LZ4_FRAME:
return arrow::Compression::type::LZ4_FRAME;
}
}
}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_)
, stream{stream_}
@ -78,12 +98,14 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema>
{
arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method));
// TODO: should we use arrow::ipc::IpcOptions::alignment?
if (stream)
writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema);
writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema, options);
else
writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema);
writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema,options);
if (!writer_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,

View File

@ -22,12 +22,42 @@
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include "config.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int NOT_IMPLEMENTED;
}
namespace
{
orc::CompressionKind getORCCompression(FormatSettings::ORCCompression method)
{
if (method == FormatSettings::ORCCompression::NONE)
return orc::CompressionKind::CompressionKind_NONE;
#if USE_SNAPPY
if (method == FormatSettings::ORCCompression::SNAPPY)
return orc::CompressionKind::CompressionKind_SNAPPY;
#endif
if (method == FormatSettings::ORCCompression::ZSTD)
return orc::CompressionKind::CompressionKind_ZSTD;
if (method == FormatSettings::ORCCompression::LZ4)
return orc::CompressionKind::CompressionKind_LZ4;
if (method == FormatSettings::ORCCompression::ZLIB)
return orc::CompressionKind::CompressionKind_ZLIB;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
}
}
ORCOutputStream::ORCOutputStream(WriteBuffer & out_) : out(out_) {}
@ -529,7 +559,7 @@ void ORCBlockOutputFormat::prepareWriter()
{
const Block & header = getPort(PortKind::Main).getHeader();
schema = orc::createStructType();
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
options.setCompression(getORCCompression(format_settings.orc.output_compression_method));
size_t columns_count = header.columns();
for (size_t i = 0; i != columns_count; ++i)
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i])));

View File

@ -16,6 +16,9 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
namespace
{
static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & settings)
{
switch (settings.parquet.output_version)
@ -31,6 +34,36 @@ static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & se
}
}
parquet::Compression::type getParquetCompression(FormatSettings::ParquetCompression method)
{
if (method == FormatSettings::ParquetCompression::NONE)
return parquet::Compression::type::UNCOMPRESSED;
#if USE_SNAPPY
if (method == FormatSettings::ParquetCompression::SNAPPY)
return parquet::Compression::type::SNAPPY;
#endif
#if USE_BROTLI
if (method == FormatSettings::ParquetCompression::BROTLI)
return parquet::Compression::type::BROTLI;
#endif
if (method == FormatSettings::ParquetCompression::ZSTD)
return parquet::Compression::type::ZSTD;
if (method == FormatSettings::ParquetCompression::LZ4)
return parquet::Compression::type::LZ4;
if (method == FormatSettings::ParquetCompression::GZIP)
return parquet::Compression::type::GZIP;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
}
}
ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), format_settings{format_settings_}
{
@ -60,9 +93,7 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
parquet::WriterProperties::Builder builder;
builder.version(getParquetVersion(format_settings));
#if USE_SNAPPY
builder.compression(parquet::Compression::SNAPPY);
#endif
builder.compression(getParquetCompression(format_settings.parquet.output_compression_method));
auto props = builder.build();
auto status = parquet::arrow::FileWriter::Open(
*arrow_table->schema(),