From 93aee32ae4b5846a08069ccb1e7154c2b9418f8b Mon Sep 17 00:00:00 2001 From: Avogar Date: Sun, 14 Jun 2020 18:35:32 +0300 Subject: [PATCH] Add ORCBlockOutputFormat --- src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatFactory.h | 4 +- .../Formats/Impl/ORCBlockOutputFormat.cpp | 409 ++++++++++++++++++ .../Formats/Impl/ORCBlockOutputFormat.h | 70 +++ .../01307_orc_output_format.reference | 6 + .../0_stateless/01307_orc_output_format.sh | 20 + 6 files changed, 509 insertions(+), 1 deletion(-) create mode 100644 src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp create mode 100644 src/Processors/Formats/Impl/ORCBlockOutputFormat.h create mode 100644 tests/queries/0_stateless/01307_orc_output_format.reference create mode 100755 tests/queries/0_stateless/01307_orc_output_format.sh diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 9182c728600..e1bb40c737c 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -394,6 +394,7 @@ FormatFactory::FormatFactory() registerOutputFormatProcessorNull(*this); registerOutputFormatProcessorMySQLWrite(*this); registerOutputFormatProcessorMarkdown(*this); + registerOutputFormatProcessorORC(*this); } FormatFactory & FormatFactory::instance() diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index c8dd97aa940..9c1a23d7164 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -175,6 +175,9 @@ void registerInputFormatProcessorTemplate(FormatFactory & factory); void registerOutputFormatProcessorTemplate(FormatFactory & factory); void registerInputFormatProcessorMsgPack(FormatFactory & factory); void registerOutputFormatProcessorMsgPack(FormatFactory & factory); +void registerInputFormatProcessorORC(FormatFactory & factory); +void registerOutputFormatProcessorORC(FormatFactory & factory); + /// File Segmentation Engines for parallel reading @@ -206,6 +209,5 @@ void registerOutputFormatProcessorMarkdown(FormatFactory & factory); void registerInputFormatProcessorCapnProto(FormatFactory & factory); void registerInputFormatProcessorRegexp(FormatFactory & factory); void registerInputFormatProcessorJSONAsString(FormatFactory & factory); -void registerInputFormatProcessorORC(FormatFactory & factory); } diff --git a/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp new file mode 100644 index 00000000000..3745ee229a8 --- /dev/null +++ b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp @@ -0,0 +1,409 @@ +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_COLUMN; +} + +ORCOutputStream::ORCOutputStream(WriteBuffer & out_) : out(out_) {} + +uint64_t ORCOutputStream::getLength() const +{ + return out.count(); +} + +uint64_t ORCOutputStream::getNaturalWriteSize() const +{ + out.nextIfAtEnd(); + return out.available(); +} + +void ORCOutputStream::write(const void* buf, size_t length) +{ + out.write(static_cast(buf), length); +} + +ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) + : IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_), data_types(header_.getDataTypes()) +{ + schema = orc::createStructType(); + size_t columns_count = header_.columns(); + for (size_t i = 0; i != columns_count; ++i) + { + schema->addStructField(header_.safeGetByPosition(i).name, getORCType(data_types[i])); + } + writer = orc::createWriter(*schema, &output_stream, options); +} + +ORC_UNIQUE_PTR ORCBlockOutputFormat::getORCType(const DataTypePtr & type) +{ + switch (type->getTypeId()) + { + case TypeIndex::UInt8: [[fallthrough]]; + case TypeIndex::Int8: + { + return orc::createPrimitiveType(orc::TypeKind::BYTE); + } + case TypeIndex::UInt16: [[fallthrough]]; + case TypeIndex::Int16: + { + return orc::createPrimitiveType(orc::TypeKind::SHORT); + } + case TypeIndex::UInt32: [[fallthrough]]; + case TypeIndex::Int32: + { + return orc::createPrimitiveType(orc::TypeKind::INT); + } + case TypeIndex::UInt64: [[fallthrough]]; + case TypeIndex::Int64: + { + return orc::createPrimitiveType(orc::TypeKind::LONG); + } + case TypeIndex::Float32: + { + return orc::createPrimitiveType(orc::TypeKind::FLOAT); + } + case TypeIndex::Float64: + { + return orc::createPrimitiveType(orc::TypeKind::DOUBLE); + } + case TypeIndex::Date: + { + return orc::createPrimitiveType(orc::TypeKind::DATE); + } + case TypeIndex::DateTime: [[fallthrough]]; + case TypeIndex::DateTime64: + { + return orc::createPrimitiveType(orc::TypeKind::TIMESTAMP); + } + case TypeIndex::FixedString: [[fallthrough]]; + case TypeIndex::String: + { + return orc::createPrimitiveType(orc::TypeKind::STRING); + } + case TypeIndex::Nullable: + { + return getORCType(removeNullable(type)); + } + /* + case TypeIndex::Array: + { + const auto * array_type = typeid_cast(type.get()); + return orc::createListType(getORCType(array_type->getNestedType())); + } + */ + case TypeIndex::Decimal32: + { + const auto * decimal_type = typeid_cast *>(type.get()); + return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale()); + } + case TypeIndex::Decimal64: + { + const auto * decimal_type = typeid_cast *>(type.get()); + return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale()); + } + case TypeIndex::Decimal128: + { + const auto * decimal_type = typeid_cast *>(type.get()); + return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale()); + } + default: + { + throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN); + } + } +} + +template +void ORCBlockOutputFormat::ORCBlockOutputFormat::writeNumbers( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, + const PaddedPODArray * null_bytemap, + size_t rows_num) +{ + NumberVectorBatch * number_orc_column = dynamic_cast(orc_column); + const auto & number_column = assert_cast &>(column); + number_orc_column->resize(rows_num); + + for (size_t i = 0; i != rows_num; ++i) + { + if (null_bytemap && (*null_bytemap)[i]) + { + number_orc_column->notNull[i] = 0; + continue; + } + number_orc_column->data[i] = number_column.getElement(i); + } + number_orc_column->numElements = rows_num; +} + +template +void ORCBlockOutputFormat::ORCBlockOutputFormat::writeDecimals( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, + DataTypePtr & type, + const PaddedPODArray * null_bytemap, + size_t rows_num, + ConvertFunc convert) +{ + DecimalVectorBatch *decimal_orc_column = dynamic_cast(orc_column); + const auto & decimal_column = assert_cast &>(column); + const auto * decimal_type = typeid_cast *>(type.get()); + decimal_orc_column->precision = decimal_type->getPrecision(); + decimal_orc_column->scale = decimal_type->getScale(); + decimal_orc_column->resize(rows_num); + for (size_t i = 0; i != rows_num; ++i) + { + if (null_bytemap && (*null_bytemap)[i]) + { + decimal_orc_column->notNull[i] = 0; + continue; + } + decimal_orc_column->values[i] = convert(decimal_column.getElement(i).value); + } + decimal_orc_column->numElements = rows_num; +} + +void ORCBlockOutputFormat::writeColumn( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, + DataTypePtr & type, + const PaddedPODArray * null_bytemap, + size_t rows_num) +{ + if (null_bytemap) + { + orc_column->hasNulls = true; + } + switch (type->getTypeId()) + { + case TypeIndex::Int8: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::UInt8: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Int16: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Date: [[fallthrough]]; + case TypeIndex::UInt16: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Int32: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::UInt32: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Int64: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::UInt64: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Float32: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::Float64: + { + writeNumbers(orc_column, column, null_bytemap, rows_num); + break; + } + case TypeIndex::FixedString: [[fallthrough]]; + case TypeIndex::String: + { + orc::StringVectorBatch * string_orc_column = dynamic_cast(orc_column); + const auto & string_column = assert_cast(column); + string_orc_column->resize(rows_num); + + for (size_t i = 0; i != rows_num; ++i) + { + if (null_bytemap && (*null_bytemap)[i]) + { + string_orc_column->notNull[i] = 0; + continue; + } + const StringRef & string = string_column.getDataAt(i); + string_orc_column->data[i] = const_cast(string.data); + string_orc_column->length[i] = string.size; + } + string_orc_column->numElements = rows_num; + break; + } + case TypeIndex::DateTime: + { + orc::TimestampVectorBatch * timestamp_orc_column = dynamic_cast(orc_column); + const auto & timestamp_column = assert_cast(column); + timestamp_orc_column->resize(rows_num); + + for (size_t i = 0; i != rows_num; ++i) + { + if (null_bytemap && (*null_bytemap)[i]) + { + timestamp_orc_column->notNull[i] = 0; + continue; + } + timestamp_orc_column->data[i] = timestamp_column.getElement(i); + timestamp_orc_column->nanoseconds[i] = 0; + } + timestamp_orc_column->numElements = rows_num; + break; + } + case TypeIndex::DateTime64: + { + orc::TimestampVectorBatch * timestamp_orc_column = dynamic_cast(orc_column); + const auto & timestamp_column = assert_cast(column); + const auto * timestamp_type = assert_cast(type.get()); + + UInt32 scale = timestamp_type->getScale(); + timestamp_orc_column->resize(rows_num); + + for (size_t i = 0; i != rows_num; ++i) + { + if (null_bytemap && (*null_bytemap)[i]) + { + timestamp_orc_column->notNull[i] = 0; + continue; + } + UInt64 value = timestamp_column.getElement(i); + timestamp_orc_column->data[i] = value / std::pow(10, scale); + timestamp_orc_column->nanoseconds[i] = (value % UInt64(std::pow(10, scale))) * std::pow(10, 9 - scale); + } + timestamp_orc_column->numElements = rows_num; + break; + } + case TypeIndex::Decimal32:; + { + writeDecimals( + orc_column, + column, + type, + null_bytemap, + rows_num, + [](Int32 value){ return value; }); + break; + } + case TypeIndex::Decimal64: + { + writeDecimals( + orc_column, + column, + type, + null_bytemap, + rows_num, + [](Int64 value){ return value; }); + break; + } + case TypeIndex::Decimal128: + { + writeDecimals( + orc_column, + column, + type, + null_bytemap, + rows_num, + [](Int128 value){ return orc::Int128(value >> 64, (value << 64) >> 64); }); + break; + } + case TypeIndex::Nullable: + { + const auto & nullable_column = assert_cast(column); + const PaddedPODArray & new_null_bytemap = assert_cast &>(*nullable_column.getNullMapColumnPtr()).getData(); + auto nested_type = removeNullable(type); + writeColumn(orc_column, nullable_column.getNestedColumn(), nested_type, &new_null_bytemap, rows_num); + break; + } + /* Doesn't work + case TypeIndex::Array: + { + orc::ListVectorBatch * list_orc_column = dynamic_cast(orc_column); + const auto & list_column = assert_cast(column); + auto nested_type = assert_cast(*type).getNestedType(); + const ColumnArray::Offsets & offsets = list_column.getOffsets(); + list_orc_column->resize(rows_num); + list_orc_column->offsets[0] = 0; + for (size_t i = 0; i != rows_num; ++i) + { + list_orc_column->offsets[i + 1] = offsets[i]; + } + const IColumn & nested_column = list_column.getData(); + orc::ColumnVectorBatch * nested_orc_column = list_orc_column->elements.get(); + writeColumn(nested_orc_column, nested_column, nested_type, null_bytemap, nested_column.size()); + list_orc_column->numElements = rows_num; + break; + } + */ + default: + throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN); + } +} + +void ORCBlockOutputFormat::consume(Chunk chunk) +{ + size_t columns_num = chunk.getNumColumns(); + size_t rows_num = chunk.getNumRows(); + ORC_UNIQUE_PTR batch = writer->createRowBatch(rows_num); + orc::StructVectorBatch *root = dynamic_cast(batch.get()); + for (size_t i = 0; i != columns_num; ++i) + { + writeColumn(root->fields[i], *chunk.getColumns()[i], data_types[i], nullptr, rows_num); + } + root->numElements = rows_num; + writer->add(*batch); +} + +void ORCBlockOutputFormat::finalize() +{ + writer->close(); +} + +void registerOutputFormatProcessorORC(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor("ORC", []( + WriteBuffer & buf, + const Block & sample, + FormatFactory::WriteCallback, + const FormatSettings & format_settings) + { + return std::make_shared(buf, sample, format_settings); + }); +} + +} diff --git a/src/Processors/Formats/Impl/ORCBlockOutputFormat.h b/src/Processors/Formats/Impl/ORCBlockOutputFormat.h new file mode 100644 index 00000000000..e075169b66f --- /dev/null +++ b/src/Processors/Formats/Impl/ORCBlockOutputFormat.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class WriteBuffer; + +class ORCOutputStream : public orc::OutputStream +{ +public: + ORCOutputStream(WriteBuffer & out_); + + uint64_t getLength() const override; + uint64_t getNaturalWriteSize() const override; + void write(const void* buf, size_t length) override; + + void close() override {}; + const std::string& getName() const override { return "ORCOutputStream"; }; + +private: + WriteBuffer & out; +}; + +class ORCBlockOutputFormat : public IOutputFormat +{ +public: + ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); + + String getName() const override { return "ORCBlockOutputFormat"; } + void consume(Chunk chunk) override; + void finalize() override; + + String getContentType() const override { return "application/octet-stream"; } + +private: + ORC_UNIQUE_PTR getORCType(const DataTypePtr & type); + template + void writeDecimals( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, + DataTypePtr & type, + const PaddedPODArray * null_bytemap, + size_t rows_num, + ConvertFunc convert); + template + void writeNumbers( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, + const PaddedPODArray * null_bytemap, + size_t rows_num); + void writeColumn( + orc::ColumnVectorBatch * orc_column, + const IColumn & column, DataTypePtr & type, + const PaddedPODArray * null_bytemap, + size_t rows_num); + + const FormatSettings format_settings; + ORCOutputStream output_stream; + DataTypes data_types; + ORC_UNIQUE_PTR writer; + ORC_UNIQUE_PTR schema; + orc::WriterOptions options; +}; + +} diff --git a/tests/queries/0_stateless/01307_orc_output_format.reference b/tests/queries/0_stateless/01307_orc_output_format.reference new file mode 100644 index 00000000000..bd62476c2df --- /dev/null +++ b/tests/queries/0_stateless/01307_orc_output_format.reference @@ -0,0 +1,6 @@ +255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 1.0001 1.0000000100 100000.00000000000001000000 1 +4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.000 34.1234 123123.1231231230 123123123.12312312312312300000 \N +42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 0000-00-00 00:00:00.000 42.4200 42.4242424200 424242.42424242424242000000 42 +255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 1.0001 1.0000000100 100000.00000000000001000000 1 +4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.123 34.1234 123123.1231231230 123123123.12312312312312300000 \N +42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 42.4200 42.4242424200 424242.42424242424242000000 42 diff --git a/tests/queries/0_stateless/01307_orc_output_format.sh b/tests/queries/0_stateless/01307_orc_output_format.sh new file mode 100755 index 00000000000..8d7e85a03de --- /dev/null +++ b/tests/queries/0_stateless/01307_orc_output_format.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS orc"; + +$CLICKHOUSE_CLIENT --query="CREATE TABLE orc (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime, datetime64 DateTime64, decimal32 Decimal32(4), decimal64 Decimal64(10), decimal128 Decimal128(20), nullable Nullable(Int32)) ENGINE = Memory"; + +$CLICKHOUSE_CLIENT --query="INSERT INTO orc VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, 1.0001, 1.00000001, 100000.00000000000001, 1), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, 34.1234, 123123.123123123, 123123123.123123123123123, NULL), (42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, 42.42, 42.42424242, 424242.42424242424242, 42)"; + +$CLICKHOUSE_CLIENT --query="SELECT * FROM orc FORMAT ORC" > $CURDIR/tmp_orc_test_all_types.orc; + +cat $CURDIR/tmp_orc_test_all_types.orc | $CLICKHOUSE_CLIENT --query="INSERT INTO orc FORMAT ORC"; + +rm $CURDIR/tmp_orc_test_all_types.orc + +$CLICKHOUSE_CLIENT --query="SELECT * FROM orc"; + +$CLICKHOUSE_CLIENT --query="DROP TABLE orc";