mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #47114 from Avogar/parquet-compression
Improve working with compression methods in Parquet/ORC/Arrow formats
This commit is contained in:
commit
71b6d6c6ae
@ -115,6 +115,13 @@ configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/A
|
||||
|
||||
# ARROW_ORC + adapters/orc/CMakefiles
|
||||
set(ORC_SRCS
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/orc_proto.pb.h"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/ExpressionTree.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/Literal.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/PredicateLeaf.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/SargsApplier.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/SearchArgument.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/sargs/TruthValue.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Exceptions.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/OrcFile.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Reader.cc"
|
||||
@ -129,13 +136,20 @@ set(ORC_SRCS
|
||||
"${ORC_SOURCE_SRC_DIR}/MemoryPool.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RLE.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RLEv1.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RLEv2.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RleDecoderV2.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RleEncoderV2.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/RLEV2Util.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Statistics.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/StripeStream.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Timezone.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/TypeImpl.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Vector.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Writer.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Adaptor.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/BloomFilter.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/Murmur3.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/BlockBuffer.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/wrap/orc-proto-wrapper.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/io/InputStream.cc"
|
||||
"${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc"
|
||||
"${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc"
|
||||
@ -358,6 +372,9 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS})
|
||||
add_definitions(-DARROW_WITH_ZSTD)
|
||||
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS})
|
||||
|
||||
add_definitions(-DARROW_WITH_BROTLI)
|
||||
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS})
|
||||
|
||||
|
||||
add_library(_arrow ${ARROW_SRCS})
|
||||
|
||||
@ -372,6 +389,7 @@ target_link_libraries(_arrow PRIVATE
|
||||
ch_contrib::snappy
|
||||
ch_contrib::zlib
|
||||
ch_contrib::zstd
|
||||
ch_contrib::brotli
|
||||
)
|
||||
target_link_libraries(_arrow PUBLIC _orc)
|
||||
|
||||
|
2
contrib/orc
vendored
2
contrib/orc
vendored
@ -1 +1 @@
|
||||
Subproject commit f9a393ed2433a60034795284f82d093b348f2102
|
||||
Subproject commit c5d7755ba0b9a95631c8daea4d094101f26ec761
|
@ -1981,6 +1981,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
|
||||
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
|
||||
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
|
||||
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
|
||||
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`.
|
||||
|
||||
## Arrow {#data-format-arrow}
|
||||
|
||||
@ -2051,6 +2052,7 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Arrow" > {filenam
|
||||
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
|
||||
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.
|
||||
- [output_format_arrow_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_fixed_string_as_fixed_byte_array) - use Arrow FIXED_SIZE_BINARY type instead of Binary/String for FixedString columns. Default value - `true`.
|
||||
- [output_format_arrow_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_compression_method) - compression method used in output Arrow format. Default value - `none`.
|
||||
|
||||
## ArrowStream {#data-format-arrow-stream}
|
||||
|
||||
@ -2107,6 +2109,7 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.
|
||||
### Arrow format settings {#parquet-format-settings}
|
||||
|
||||
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
|
||||
- [output_format_orc_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_orc_compression_method) - compression method used in output ORC format. Default value - `none`.
|
||||
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
|
||||
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
|
||||
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
|
||||
|
@ -1014,6 +1014,12 @@ Use Arrow FIXED_SIZE_BINARY type instead of Binary/String for FixedString column
|
||||
|
||||
Enabled by default.
|
||||
|
||||
### output_format_arrow_compression_method {#output_format_arrow_compression_method}
|
||||
|
||||
Compression method used in output Arrow format. Supported codecs: `lz4_frame`, `zstd`, `none` (uncompressed)
|
||||
|
||||
Default value: `none`.
|
||||
|
||||
## ORC format settings {#orc-format-settings}
|
||||
|
||||
### input_format_orc_import_nested {#input_format_orc_import_nested}
|
||||
@ -1057,6 +1063,12 @@ Use ORC String type instead of Binary for String columns.
|
||||
|
||||
Disabled by default.
|
||||
|
||||
### output_format_orc_compression_method {#output_format_orc_compression_method}
|
||||
|
||||
Compression method used in output ORC format. Supported codecs: `lz4`, `snappy`, `zlib`, `zstd`, `none` (uncompressed)
|
||||
|
||||
Default value: `none`.
|
||||
|
||||
## Parquet format settings {#parquet-format-settings}
|
||||
|
||||
### input_format_parquet_import_nested {#input_format_parquet_import_nested}
|
||||
@ -1112,6 +1124,12 @@ The version of Parquet format used in output format. Supported versions: `1.0`,
|
||||
|
||||
Default value: `2.latest`.
|
||||
|
||||
### output_format_parquet_compression_method {#output_format_parquet_compression_method}
|
||||
|
||||
Compression method used in output Parquet format. Supported codecs: `snappy`, `lz4`, `brotli`, `zstd`, `gzip`, `none` (uncompressed)
|
||||
|
||||
Default value: `snappy`.
|
||||
|
||||
## Hive format settings {#hive-format-settings}
|
||||
|
||||
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}
|
||||
|
@ -865,6 +865,7 @@ class IColumn;
|
||||
M(Bool, output_format_parquet_string_as_string, false, "Use Parquet String type instead of Binary for String columns.", 0) \
|
||||
M(Bool, output_format_parquet_fixed_string_as_fixed_byte_array, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary for FixedString columns.", 0) \
|
||||
M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \
|
||||
M(ParquetCompression, output_format_parquet_compression_method, "lz4", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \
|
||||
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
|
||||
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
|
||||
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
|
||||
@ -907,8 +908,10 @@ class IColumn;
|
||||
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
|
||||
M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \
|
||||
M(Bool, output_format_arrow_fixed_string_as_fixed_byte_array, true, "Use Arrow FIXED_SIZE_BINARY type instead of Binary for FixedString columns.", 0) \
|
||||
M(ArrowCompression, output_format_arrow_compression_method, "lz4_frame", "Compression method for Arrow output format. Supported codecs: lz4_frame, zstd, none (uncompressed)", 0) \
|
||||
\
|
||||
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
|
||||
M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
|
||||
\
|
||||
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
|
||||
\
|
||||
|
@ -81,7 +81,10 @@ namespace SettingsChangesHistory
|
||||
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
|
||||
{
|
||||
{"23.3", {{"output_format_parquet_version", "1.0", "2.latest", "Use latest Parquet format version for output format"},
|
||||
{"input_format_json_ignore_unknown_keys_in_named_tuple", false, true, "Improve parsing JSON objects as named tuples"}}},
|
||||
{"input_format_json_ignore_unknown_keys_in_named_tuple", false, true, "Improve parsing JSON objects as named tuples"},
|
||||
{"output_format_arrow_compression_method", "none", "lz4_frame", "Use lz4 compression in Arrow output format by default"},
|
||||
{"output_format_parquet_compression_method", "snappy", "lz4", "Use lz4 compression in Parquet output format by default"},
|
||||
{"output_format_orc_compression_method", "none", "lz4_frame", "Use lz4 compression in ORC output format by default"}}},
|
||||
{"23.2", {{"output_format_parquet_fixed_string_as_fixed_byte_array", false, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type for FixedString by default"},
|
||||
{"output_format_arrow_fixed_string_as_fixed_byte_array", false, true, "Use Arrow FIXED_SIZE_BINARY type for FixedString by default"},
|
||||
{"query_plan_remove_redundant_distinct", false, true, "Remove redundant Distinct step in query plan"},
|
||||
|
@ -158,7 +158,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
|
||||
{"XML", FormatSettings::EscapingRule::XML},
|
||||
{"Raw", FormatSettings::EscapingRule::Raw}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation , ErrorCodes::BAD_ARGUMENTS,
|
||||
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"bin", FormatSettings::MsgPackUUIDRepresentation::BIN},
|
||||
{"str", FormatSettings::MsgPackUUIDRepresentation::STR},
|
||||
{"ext", FormatSettings::MsgPackUUIDRepresentation::EXT}})
|
||||
@ -176,11 +176,30 @@ IMPLEMENT_SETTING_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS,
|
||||
{"pread", LocalFSReadMethod::pread},
|
||||
{"read", LocalFSReadMethod::read}})
|
||||
|
||||
|
||||
IMPLEMENT_SETTING_ENUM_WITH_RENAME(ParquetVersion, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"1.0", FormatSettings::ParquetVersion::V1_0},
|
||||
{"2.4", FormatSettings::ParquetVersion::V2_4},
|
||||
{"2.6", FormatSettings::ParquetVersion::V2_6},
|
||||
{"2.latest", FormatSettings::ParquetVersion::V2_LATEST}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(ParquetCompression, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"none", FormatSettings::ParquetCompression::NONE},
|
||||
{"snappy", FormatSettings::ParquetCompression::SNAPPY},
|
||||
{"zstd", FormatSettings::ParquetCompression::ZSTD},
|
||||
{"gzip", FormatSettings::ParquetCompression::GZIP},
|
||||
{"lz4", FormatSettings::ParquetCompression::LZ4},
|
||||
{"brotli", FormatSettings::ParquetCompression::BROTLI}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(ArrowCompression, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"none", FormatSettings::ArrowCompression::NONE},
|
||||
{"lz4_frame", FormatSettings::ArrowCompression::LZ4_FRAME},
|
||||
{"zstd", FormatSettings::ArrowCompression::ZSTD}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"none", FormatSettings::ORCCompression::NONE},
|
||||
{"snappy", FormatSettings::ORCCompression::SNAPPY},
|
||||
{"zstd", FormatSettings::ORCCompression::ZSTD},
|
||||
{"zlib", FormatSettings::ORCCompression::ZLIB},
|
||||
{"lz4", FormatSettings::ORCCompression::LZ4}})
|
||||
|
||||
}
|
||||
|
@ -194,6 +194,12 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
|
||||
|
||||
DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation)
|
||||
|
||||
DECLARE_SETTING_ENUM_WITH_RENAME(ParquetCompression, FormatSettings::ParquetCompression)
|
||||
|
||||
DECLARE_SETTING_ENUM_WITH_RENAME(ArrowCompression, FormatSettings::ArrowCompression)
|
||||
|
||||
DECLARE_SETTING_ENUM_WITH_RENAME(ORCCompression, FormatSettings::ORCCompression)
|
||||
|
||||
enum class Dialect
|
||||
{
|
||||
clickhouse,
|
||||
|
@ -118,6 +118,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
|
||||
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
|
||||
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
|
||||
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
|
||||
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
|
||||
format_settings.pretty.color = settings.output_format_pretty_color;
|
||||
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
|
||||
@ -158,6 +159,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.arrow.case_insensitive_column_matching = settings.input_format_arrow_case_insensitive_column_matching;
|
||||
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
|
||||
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
|
||||
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
|
||||
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
|
||||
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
|
||||
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
|
||||
@ -168,6 +170,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching;
|
||||
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
|
||||
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
|
||||
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
|
||||
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
|
||||
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;
|
||||
|
@ -86,6 +86,13 @@ struct FormatSettings
|
||||
|
||||
UInt64 max_parser_depth = DBMS_DEFAULT_MAX_PARSER_DEPTH;
|
||||
|
||||
enum class ArrowCompression
|
||||
{
|
||||
NONE,
|
||||
LZ4_FRAME,
|
||||
ZSTD
|
||||
};
|
||||
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
@ -96,6 +103,7 @@ struct FormatSettings
|
||||
bool case_insensitive_column_matching = false;
|
||||
bool output_string_as_string = false;
|
||||
bool output_fixed_string_as_fixed_byte_array = true;
|
||||
ArrowCompression output_compression_method = ArrowCompression::NONE;
|
||||
} arrow;
|
||||
|
||||
struct
|
||||
@ -183,6 +191,16 @@ struct FormatSettings
|
||||
V2_LATEST,
|
||||
};
|
||||
|
||||
enum class ParquetCompression
|
||||
{
|
||||
NONE,
|
||||
SNAPPY,
|
||||
ZSTD,
|
||||
LZ4,
|
||||
GZIP,
|
||||
BROTLI,
|
||||
};
|
||||
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
@ -195,6 +213,7 @@ struct FormatSettings
|
||||
bool output_fixed_string_as_fixed_byte_array = true;
|
||||
UInt64 max_block_size = 8192;
|
||||
ParquetVersion output_version;
|
||||
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
|
||||
} parquet;
|
||||
|
||||
struct Pretty
|
||||
@ -276,6 +295,15 @@ struct FormatSettings
|
||||
bool accurate_types_of_literals = true;
|
||||
} values;
|
||||
|
||||
enum class ORCCompression
|
||||
{
|
||||
NONE,
|
||||
LZ4,
|
||||
SNAPPY,
|
||||
ZSTD,
|
||||
ZLIB,
|
||||
};
|
||||
|
||||
struct
|
||||
{
|
||||
bool import_nested = false;
|
||||
@ -285,6 +313,7 @@ struct FormatSettings
|
||||
bool case_insensitive_column_matching = false;
|
||||
std::unordered_set<int> skip_stripes = {};
|
||||
bool output_string_as_string = false;
|
||||
ORCCompression output_compression_method = ORCCompression::NONE;
|
||||
} orc;
|
||||
|
||||
/// For capnProto format we should determine how to
|
||||
|
@ -17,6 +17,24 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
arrow::Compression::type getArrowCompression(FormatSettings::ArrowCompression method)
|
||||
{
|
||||
switch (method)
|
||||
{
|
||||
case FormatSettings::ArrowCompression::NONE:
|
||||
return arrow::Compression::type::UNCOMPRESSED;
|
||||
case FormatSettings::ArrowCompression::ZSTD:
|
||||
return arrow::Compression::type::ZSTD;
|
||||
case FormatSettings::ArrowCompression::LZ4_FRAME:
|
||||
return arrow::Compression::type::LZ4_FRAME;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
|
||||
: IOutputFormat(header_, out_)
|
||||
, stream{stream_}
|
||||
@ -78,12 +96,14 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema>
|
||||
{
|
||||
arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
|
||||
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
|
||||
arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
|
||||
options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method));
|
||||
|
||||
// TODO: should we use arrow::ipc::IpcOptions::alignment?
|
||||
if (stream)
|
||||
writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema);
|
||||
writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema, options);
|
||||
else
|
||||
writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema);
|
||||
writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema,options);
|
||||
|
||||
if (!writer_status.ok())
|
||||
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
||||
|
@ -28,6 +28,34 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
orc::CompressionKind getORCCompression(FormatSettings::ORCCompression method)
|
||||
{
|
||||
if (method == FormatSettings::ORCCompression::NONE)
|
||||
return orc::CompressionKind::CompressionKind_NONE;
|
||||
|
||||
#if USE_SNAPPY
|
||||
if (method == FormatSettings::ORCCompression::SNAPPY)
|
||||
return orc::CompressionKind::CompressionKind_SNAPPY;
|
||||
#endif
|
||||
|
||||
if (method == FormatSettings::ORCCompression::ZSTD)
|
||||
return orc::CompressionKind::CompressionKind_ZSTD;
|
||||
|
||||
if (method == FormatSettings::ORCCompression::LZ4)
|
||||
return orc::CompressionKind::CompressionKind_LZ4;
|
||||
|
||||
if (method == FormatSettings::ORCCompression::ZLIB)
|
||||
return orc::CompressionKind::CompressionKind_ZLIB;
|
||||
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ORCOutputStream::ORCOutputStream(WriteBuffer & out_) : out(out_) {}
|
||||
@ -544,7 +572,7 @@ void ORCBlockOutputFormat::prepareWriter()
|
||||
{
|
||||
const Block & header = getPort(PortKind::Main).getHeader();
|
||||
schema = orc::createStructType();
|
||||
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
|
||||
options.setCompression(getORCCompression(format_settings.orc.output_compression_method));
|
||||
size_t columns_count = header.columns();
|
||||
for (size_t i = 0; i != columns_count; ++i)
|
||||
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i])));
|
||||
|
@ -14,9 +14,13 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & settings)
|
||||
namespace
|
||||
{
|
||||
|
||||
parquet::ParquetVersion::type getParquetVersion(const FormatSettings & settings)
|
||||
{
|
||||
switch (settings.parquet.output_version)
|
||||
{
|
||||
@ -31,6 +35,35 @@ static parquet::ParquetVersion::type getParquetVersion(const FormatSettings & se
|
||||
}
|
||||
}
|
||||
|
||||
parquet::Compression::type getParquetCompression(FormatSettings::ParquetCompression method)
|
||||
{
|
||||
if (method == FormatSettings::ParquetCompression::NONE)
|
||||
return parquet::Compression::type::UNCOMPRESSED;
|
||||
|
||||
#if USE_SNAPPY
|
||||
if (method == FormatSettings::ParquetCompression::SNAPPY)
|
||||
return parquet::Compression::type::SNAPPY;
|
||||
#endif
|
||||
|
||||
#if USE_BROTLI
|
||||
if (method == FormatSettings::ParquetCompression::BROTLI)
|
||||
return parquet::Compression::type::BROTLI;
|
||||
#endif
|
||||
|
||||
if (method == FormatSettings::ParquetCompression::ZSTD)
|
||||
return parquet::Compression::type::ZSTD;
|
||||
|
||||
if (method == FormatSettings::ParquetCompression::LZ4)
|
||||
return parquet::Compression::type::LZ4;
|
||||
|
||||
if (method == FormatSettings::ParquetCompression::GZIP)
|
||||
return parquet::Compression::type::GZIP;
|
||||
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||
: IOutputFormat(header_, out_), format_settings{format_settings_}
|
||||
{
|
||||
@ -60,9 +93,7 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
|
||||
|
||||
parquet::WriterProperties::Builder builder;
|
||||
builder.version(getParquetVersion(format_settings));
|
||||
#if USE_SNAPPY
|
||||
builder.compression(parquet::Compression::SNAPPY);
|
||||
#endif
|
||||
builder.compression(getParquetCompression(format_settings.parquet.output_compression_method));
|
||||
auto props = builder.build();
|
||||
auto status = parquet::arrow::FileWriter::Open(
|
||||
*arrow_table->schema(),
|
||||
|
Binary file not shown.
@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT --query="CREATE TABLE orc (array1 Array(Int32), array2 Array(
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="INSERT INTO orc VALUES ([1,2,3,4,5], [[1,2], [3,4], [5]]), ([42], [[42, 42], [42]])";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT * FROM orc FORMAT ORC";
|
||||
$CLICKHOUSE_CLIENT --query="SELECT * FROM orc FORMAT ORC SETTINGS output_format_orc_compression_method='none'" | md5sum;
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE orc";
|
||||
|
||||
|
Binary file not shown.
9
tests/queries/0_stateless/02426_orc_bug.sh
Executable file
9
tests/queries/0_stateless/02426_orc_bug.sh
Executable file
@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT arrayJoin([[], [1]]) FORMAT ORC SETTINGS output_format_orc_compression_method='none'" | md5sum;
|
||||
|
@ -1,3 +0,0 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
SELECT arrayJoin([[], [1]]) FORMAT ORC;
|
@ -0,0 +1,14 @@
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
||||
10
|
25
tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh
Executable file
25
tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh
Executable file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='lz4'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='brotli'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='gzip'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
|
||||
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format ORC settings output_format_orc_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=ORC -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format ORC settings output_format_orc_compression_method='lz4'" | $CLICKHOUSE_LOCAL --input-format=ORC -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format ORC settings output_format_orc_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=ORC -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format ORC settings output_format_orc_compression_method='zlib'" | $CLICKHOUSE_LOCAL --input-format=ORC -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format ORC settings output_format_orc_compression_method='snappy'" | $CLICKHOUSE_LOCAL --input-format=ORC -q "select count() from table"
|
||||
|
||||
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Arrow settings output_format_arrow_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=Arrow -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Arrow settings output_format_arrow_compression_method='lz4_frame'" | $CLICKHOUSE_LOCAL --input-format=Arrow -q "select count() from table"
|
||||
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Arrow settings output_format_arrow_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=Arrow -q "select count() from table"
|
||||
|
Loading…
Reference in New Issue
Block a user