diff --git a/contrib/arrow-cmake/CMakeLists.txt b/contrib/arrow-cmake/CMakeLists.txt index ae6f270a768..4181f916d63 100644 --- a/contrib/arrow-cmake/CMakeLists.txt +++ b/contrib/arrow-cmake/CMakeLists.txt @@ -115,6 +115,13 @@ configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/A # ARROW_ORC + adapters/orc/CMakefiles set(ORC_SRCS + "${CMAKE_CURRENT_BINARY_DIR}/orc_proto.pb.h" + "${ORC_SOURCE_SRC_DIR}/sargs/ExpressionTree.cc" + "${ORC_SOURCE_SRC_DIR}/sargs/Literal.cc" + "${ORC_SOURCE_SRC_DIR}/sargs/PredicateLeaf.cc" + "${ORC_SOURCE_SRC_DIR}/sargs/SargsApplier.cc" + "${ORC_SOURCE_SRC_DIR}/sargs/SearchArgument.cc" + "${ORC_SOURCE_SRC_DIR}/sargs/TruthValue.cc" "${ORC_SOURCE_SRC_DIR}/Exceptions.cc" "${ORC_SOURCE_SRC_DIR}/OrcFile.cc" "${ORC_SOURCE_SRC_DIR}/Reader.cc" @@ -129,13 +136,20 @@ set(ORC_SRCS "${ORC_SOURCE_SRC_DIR}/MemoryPool.cc" "${ORC_SOURCE_SRC_DIR}/RLE.cc" "${ORC_SOURCE_SRC_DIR}/RLEv1.cc" - "${ORC_SOURCE_SRC_DIR}/RLEv2.cc" + "${ORC_SOURCE_SRC_DIR}/RleDecoderV2.cc" + "${ORC_SOURCE_SRC_DIR}/RleEncoderV2.cc" + "${ORC_SOURCE_SRC_DIR}/RLEV2Util.cc" "${ORC_SOURCE_SRC_DIR}/Statistics.cc" "${ORC_SOURCE_SRC_DIR}/StripeStream.cc" "${ORC_SOURCE_SRC_DIR}/Timezone.cc" "${ORC_SOURCE_SRC_DIR}/TypeImpl.cc" "${ORC_SOURCE_SRC_DIR}/Vector.cc" "${ORC_SOURCE_SRC_DIR}/Writer.cc" + "${ORC_SOURCE_SRC_DIR}/Adaptor.cc" + "${ORC_SOURCE_SRC_DIR}/BloomFilter.cc" + "${ORC_SOURCE_SRC_DIR}/Murmur3.cc" + "${ORC_SOURCE_SRC_DIR}/BlockBuffer.cc" + "${ORC_SOURCE_SRC_DIR}/wrap/orc-proto-wrapper.cc" "${ORC_SOURCE_SRC_DIR}/io/InputStream.cc" "${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc" "${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc" @@ -358,6 +372,9 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS}) add_definitions(-DARROW_WITH_ZSTD) SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS}) +add_definitions(-DARROW_WITH_BROTLI) +SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS}) + add_library(_arrow ${ARROW_SRCS}) @@ -372,6 +389,7 @@ target_link_libraries(_arrow PRIVATE ch_contrib::snappy ch_contrib::zlib ch_contrib::zstd + ch_contrib::brotli ) target_link_libraries(_arrow PUBLIC _orc) diff --git a/contrib/orc b/contrib/orc index f9a393ed243..c5d7755ba0b 160000 --- a/contrib/orc +++ b/contrib/orc @@ -1 +1 @@ -Subproject commit f9a393ed2433a60034795284f82d093b348f2102 +Subproject commit c5d7755ba0b9a95631c8daea4d094101f26ec761 diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3908254b6f1..8d3e787f42c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -857,6 +857,7 @@ class IColumn; M(Bool, output_format_parquet_string_as_string, false, "Use Parquet String type instead of Binary for String columns.", 0) \ M(Bool, output_format_parquet_fixed_string_as_fixed_byte_array, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary for FixedString columns.", 0) \ M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \ + M(ParquetCompression, output_format_parquet_compression_method, "snappy", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \ M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \ M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \ M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \ @@ -899,8 +900,10 @@ class IColumn; M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \ M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \ M(Bool, output_format_arrow_fixed_string_as_fixed_byte_array, true, "Use Arrow FIXED_SIZE_BINARY type instead of Binary for FixedString columns.", 0) \ + M(ArrowCompression, output_format_arrow_compression_method, "none", "Compression method for Arrow output format. Supported codecs: lz4_frame, zstd, none (uncompressed)", 0) \ \ M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \ + M(ORCCompression, output_format_orc_compression_method, "none", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \ \ M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \ \ diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 9e1ab585bb0..91572aa1b3f 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -158,7 +158,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS, {"XML", FormatSettings::EscapingRule::XML}, {"Raw", FormatSettings::EscapingRule::Raw}}) -IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation , ErrorCodes::BAD_ARGUMENTS, +IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation, ErrorCodes::BAD_ARGUMENTS, {{"bin", FormatSettings::MsgPackUUIDRepresentation::BIN}, {"str", FormatSettings::MsgPackUUIDRepresentation::STR}, {"ext", FormatSettings::MsgPackUUIDRepresentation::EXT}}) @@ -172,11 +172,30 @@ IMPLEMENT_SETTING_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS, {"pread", LocalFSReadMethod::pread}, {"read", LocalFSReadMethod::read}}) - IMPLEMENT_SETTING_ENUM_WITH_RENAME(ParquetVersion, ErrorCodes::BAD_ARGUMENTS, {{"1.0", FormatSettings::ParquetVersion::V1_0}, {"2.4", FormatSettings::ParquetVersion::V2_4}, {"2.6", FormatSettings::ParquetVersion::V2_6}, {"2.latest", FormatSettings::ParquetVersion::V2_LATEST}}) +IMPLEMENT_SETTING_ENUM(ParquetCompression, ErrorCodes::BAD_ARGUMENTS, + {{"none", FormatSettings::ParquetCompression::NONE}, + {"snappy", FormatSettings::ParquetCompression::SNAPPY}, + {"zstd", FormatSettings::ParquetCompression::ZSTD}, + {"gzip", FormatSettings::ParquetCompression::GZIP}, + {"lz4", FormatSettings::ParquetCompression::LZ4}, + {"brotli", FormatSettings::ParquetCompression::BROTLI}}) + +IMPLEMENT_SETTING_ENUM(ArrowCompression, ErrorCodes::BAD_ARGUMENTS, + {{"none", FormatSettings::ArrowCompression::NONE}, + {"lz4_frame", FormatSettings::ArrowCompression::LZ4_FRAME}, + {"zstd", FormatSettings::ArrowCompression::ZSTD}}) + +IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS, + {{"none", FormatSettings::ORCCompression::NONE}, + {"snappy", FormatSettings::ORCCompression::SNAPPY}, + {"zstd", FormatSettings::ORCCompression::ZSTD}, + {"zlib", FormatSettings::ORCCompression::ZLIB}, + {"lz4", FormatSettings::ORCCompression::LZ4}}) + } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 139a04f3a5a..14e952bbd65 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -194,6 +194,12 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule) DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation) +DECLARE_SETTING_ENUM_WITH_RENAME(ParquetCompression, FormatSettings::ParquetCompression) + +DECLARE_SETTING_ENUM_WITH_RENAME(ArrowCompression, FormatSettings::ArrowCompression) + +DECLARE_SETTING_ENUM_WITH_RENAME(ORCCompression, FormatSettings::ORCCompression) + enum class Dialect { clickhouse, diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index aca3166a8c4..7f14810b260 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -118,6 +118,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string; format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array; format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; + format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8; format_settings.pretty.color = settings.output_format_pretty_color; format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width; @@ -158,6 +159,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.arrow.case_insensitive_column_matching = settings.input_format_arrow_case_insensitive_column_matching; format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string; format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array; + format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method; format_settings.orc.import_nested = settings.input_format_orc_import_nested; format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns; format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size; @@ -168,6 +170,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference; format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching; format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string; + format_settings.orc.output_compression_method = settings.output_format_orc_compression_method; format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields; format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode; format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index d1755a35c5f..88a5adbc8df 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -86,6 +86,13 @@ struct FormatSettings UInt64 max_parser_depth = DBMS_DEFAULT_MAX_PARSER_DEPTH; + enum class ArrowCompression + { + NONE, + LZ4_FRAME, + ZSTD + }; + struct { UInt64 row_group_size = 1000000; @@ -96,6 +103,7 @@ struct FormatSettings bool case_insensitive_column_matching = false; bool output_string_as_string = false; bool output_fixed_string_as_fixed_byte_array = true; + ArrowCompression output_compression_method = ArrowCompression::NONE; } arrow; struct @@ -183,6 +191,16 @@ struct FormatSettings V2_LATEST, }; + enum class ParquetCompression + { + NONE, + SNAPPY, + ZSTD, + LZ4, + GZIP, + BROTLI, + }; + struct { UInt64 row_group_size = 1000000; @@ -195,6 +213,7 @@ struct FormatSettings bool output_fixed_string_as_fixed_byte_array = true; UInt64 max_block_size = 8192; ParquetVersion output_version; + ParquetCompression output_compression_method = ParquetCompression::SNAPPY; } parquet; struct Pretty @@ -276,6 +295,15 @@ struct FormatSettings bool accurate_types_of_literals = true; } values; + enum class ORCCompression + { + NONE, + LZ4, + SNAPPY, + ZSTD, + ZLIB, + }; + struct { bool import_nested = false; @@ -285,6 +313,7 @@ struct FormatSettings bool case_insensitive_column_matching = false; std::unordered_set skip_stripes = {}; bool output_string_as_string = false; + ORCCompression output_compression_method = ORCCompression::NONE; } orc; /// For capnProto format we should determine how to diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp index bf0e2448082..ec35c52e37c 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp @@ -8,6 +8,7 @@ #include #include "ArrowBufferedStreams.h" #include "CHColumnToArrowColumn.h" +#include "config.h" namespace DB @@ -17,6 +18,25 @@ namespace ErrorCodes extern const int UNKNOWN_EXCEPTION; } +namespace +{ + +arrow::Compression::type getArrowCompression(FormatSettings::ArrowCompression method) +{ + switch (method) + { + case FormatSettings::ArrowCompression::NONE: + return arrow::Compression::type::UNCOMPRESSED; + case FormatSettings::ArrowCompression::ZSTD: + return arrow::Compression::type::ZSTD; + case FormatSettings::ArrowCompression::LZ4_FRAME: + return arrow::Compression::type::LZ4_FRAME; + } +} + +} + + ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_) : IOutputFormat(header_, out_) , stream{stream_} @@ -78,12 +98,14 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr { arrow_ostream = std::make_shared(out); arrow::Result> writer_status; + arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults(); + options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method)); // TODO: should we use arrow::ipc::IpcOptions::alignment? if (stream) - writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema); + writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema, options); else - writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema); + writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema,options); if (!writer_status.ok()) throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, diff --git a/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp index 42c3e178436..ecb7c2fbc92 100644 --- a/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp @@ -22,12 +22,42 @@ #include #include +#include "config.h" + namespace DB { namespace ErrorCodes { extern const int ILLEGAL_COLUMN; + extern const int NOT_IMPLEMENTED; +} + +namespace +{ + +orc::CompressionKind getORCCompression(FormatSettings::ORCCompression method) +{ + if (method == FormatSettings::ORCCompression::NONE) + return orc::CompressionKind::CompressionKind_NONE; + +#if USE_SNAPPY + if (method == FormatSettings::ORCCompression::SNAPPY) + return orc::CompressionKind::CompressionKind_SNAPPY; +#endif + + if (method == FormatSettings::ORCCompression::ZSTD) + return orc::CompressionKind::CompressionKind_ZSTD; + + if (method == FormatSettings::ORCCompression::LZ4) + return orc::CompressionKind::CompressionKind_LZ4; + + if (method == FormatSettings::ORCCompression::ZLIB) + return orc::CompressionKind::CompressionKind_ZLIB; + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method"); +} + } ORCOutputStream::ORCOutputStream(WriteBuffer & out_) : out(out_) {} @@ -529,7 +559,7 @@ void ORCBlockOutputFormat::prepareWriter() { const Block & header = getPort(PortKind::Main).getHeader(); schema = orc::createStructType(); - options.setCompression(orc::CompressionKind::CompressionKind_NONE); + options.setCompression(getORCCompression(format_settings.orc.output_compression_method)); size_t columns_count = header.columns(); for (size_t i = 0; i != columns_count; ++i) schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i]))); diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index 18c81f8fd6a..742912df980 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -16,6 +16,9 @@ namespace ErrorCodes extern const int UNKNOWN_EXCEPTION; } +namespace +{ + static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & settings) { switch (settings.parquet.output_version) @@ -31,6 +34,36 @@ static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & se } } +parquet::Compression::type getParquetCompression(FormatSettings::ParquetCompression method) +{ + if (method == FormatSettings::ParquetCompression::NONE) + return parquet::Compression::type::UNCOMPRESSED; + +#if USE_SNAPPY + if (method == FormatSettings::ParquetCompression::SNAPPY) + return parquet::Compression::type::SNAPPY; +#endif + +#if USE_BROTLI + if (method == FormatSettings::ParquetCompression::BROTLI) + return parquet::Compression::type::BROTLI; +#endif + + if (method == FormatSettings::ParquetCompression::ZSTD) + return parquet::Compression::type::ZSTD; + + if (method == FormatSettings::ParquetCompression::LZ4) + return parquet::Compression::type::LZ4; + + if (method == FormatSettings::ParquetCompression::GZIP) + return parquet::Compression::type::GZIP; + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method"); +} + + +} + ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) : IOutputFormat(header_, out_), format_settings{format_settings_} { @@ -60,9 +93,7 @@ void ParquetBlockOutputFormat::consume(Chunk chunk) parquet::WriterProperties::Builder builder; builder.version(getParquetVersion(format_settings)); -#if USE_SNAPPY - builder.compression(parquet::Compression::SNAPPY); -#endif + builder.compression(getParquetCompression(format_settings.parquet.output_compression_method)); auto props = builder.build(); auto status = parquet::arrow::FileWriter::Open( *arrow_table->schema(),