mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #11662 from Avogar/orc_output_format
Add ORCBlockOutputFormat
This commit is contained in:
commit
53522c728b
@ -86,7 +86,10 @@ static INLINE void memcpy_sse2_128(void *dst, const void *src) {
|
||||
//---------------------------------------------------------------------
|
||||
// tiny memory copy with jump table optimized
|
||||
//---------------------------------------------------------------------
|
||||
static INLINE void *memcpy_tiny(void *dst, const void *src, size_t size) {
|
||||
/// Attribute is used to avoid an error with undefined behaviour sanitizer
|
||||
/// ../contrib/FastMemcpy/FastMemcpy.h:91:56: runtime error: applying zero offset to null pointer
|
||||
/// Found by 01307_orc_output_format.sh, cause - ORCBlockInputFormat and external ORC library.
|
||||
__attribute__((__no_sanitize__("undefined"))) static INLINE void *memcpy_tiny(void *dst, const void *src, size_t size) {
|
||||
unsigned char *dd = ((unsigned char*)dst) + size;
|
||||
const unsigned char *ss = ((const unsigned char*)src) + size;
|
||||
|
||||
|
@ -372,6 +372,14 @@ target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_C
|
||||
|
||||
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})
|
||||
|
||||
if (USE_ORC)
|
||||
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${ORC_INCLUDE_DIR})
|
||||
configure_file (
|
||||
"${ORC_INCLUDE_DIR}/orc/orc-config.hh.in"
|
||||
"${ORC_INCLUDE_DIR}/orc/orc-config.hh"
|
||||
)
|
||||
endif ()
|
||||
|
||||
if (ENABLE_TESTS AND USE_GTEST)
|
||||
macro (grep_gtest_sources BASE_DIR DST_VAR)
|
||||
# Cold match files that are not in tests/ directories
|
||||
|
@ -362,6 +362,7 @@ FormatFactory::FormatFactory()
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
registerInputFormatProcessorCapnProto(*this);
|
||||
registerInputFormatProcessorORC(*this);
|
||||
registerOutputFormatProcessorORC(*this);
|
||||
registerInputFormatProcessorParquet(*this);
|
||||
registerOutputFormatProcessorParquet(*this);
|
||||
registerInputFormatProcessorArrow(*this);
|
||||
|
@ -175,6 +175,9 @@ void registerInputFormatProcessorTemplate(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
|
||||
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
|
||||
void registerInputFormatProcessorORC(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorORC(FormatFactory & factory);
|
||||
|
||||
|
||||
/// File Segmentation Engines for parallel reading
|
||||
|
||||
@ -207,6 +210,5 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
|
||||
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
|
||||
void registerInputFormatProcessorRegexp(FormatFactory & factory);
|
||||
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
|
||||
void registerInputFormatProcessorORC(FormatFactory & factory);
|
||||
|
||||
}
|
||||
|
455
src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp
Normal file
455
src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp
Normal file
@ -0,0 +1,455 @@
|
||||
#include <Processors/Formats/Impl/ORCBlockOutputFormat.h>
|
||||
|
||||
#if USE_ORC
|
||||
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeDateTime64.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
}
|
||||
|
||||
ORCOutputStream::ORCOutputStream(WriteBuffer & out_) : out(out_) {}
|
||||
|
||||
uint64_t ORCOutputStream::getLength() const
|
||||
{
|
||||
return out.count();
|
||||
}
|
||||
|
||||
uint64_t ORCOutputStream::getNaturalWriteSize() const
|
||||
{
|
||||
out.nextIfAtEnd();
|
||||
return out.available();
|
||||
}
|
||||
|
||||
void ORCOutputStream::write(const void* buf, size_t length)
|
||||
{
|
||||
out.write(static_cast<const char *>(buf), length);
|
||||
}
|
||||
|
||||
ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||
: IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_), data_types(header_.getDataTypes())
|
||||
{
|
||||
schema = orc::createStructType();
|
||||
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
|
||||
size_t columns_count = header_.columns();
|
||||
for (size_t i = 0; i != columns_count; ++i)
|
||||
schema->addStructField(header_.safeGetByPosition(i).name, getORCType(data_types[i]));
|
||||
writer = orc::createWriter(*schema, &output_stream, options);
|
||||
}
|
||||
|
||||
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
|
||||
{
|
||||
switch (type->getTypeId())
|
||||
{
|
||||
case TypeIndex::UInt8: [[fallthrough]];
|
||||
case TypeIndex::Int8:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::BYTE);
|
||||
}
|
||||
case TypeIndex::UInt16: [[fallthrough]];
|
||||
case TypeIndex::Int16:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::SHORT);
|
||||
}
|
||||
case TypeIndex::UInt32: [[fallthrough]];
|
||||
case TypeIndex::Int32:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::INT);
|
||||
}
|
||||
case TypeIndex::UInt64: [[fallthrough]];
|
||||
case TypeIndex::Int64:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::LONG);
|
||||
}
|
||||
case TypeIndex::Float32:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::FLOAT);
|
||||
}
|
||||
case TypeIndex::Float64:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::DOUBLE);
|
||||
}
|
||||
case TypeIndex::Date:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::DATE);
|
||||
}
|
||||
case TypeIndex::DateTime: [[fallthrough]];
|
||||
case TypeIndex::DateTime64:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::TIMESTAMP);
|
||||
}
|
||||
case TypeIndex::FixedString: [[fallthrough]];
|
||||
case TypeIndex::String:
|
||||
{
|
||||
return orc::createPrimitiveType(orc::TypeKind::BINARY);
|
||||
}
|
||||
case TypeIndex::Nullable:
|
||||
{
|
||||
return getORCType(removeNullable(type));
|
||||
}
|
||||
case TypeIndex::Array:
|
||||
{
|
||||
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
|
||||
return orc::createListType(getORCType(array_type->getNestedType()));
|
||||
}
|
||||
case TypeIndex::Decimal32:
|
||||
{
|
||||
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal32> *>(type.get());
|
||||
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
|
||||
}
|
||||
case TypeIndex::Decimal64:
|
||||
{
|
||||
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal64> *>(type.get());
|
||||
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
|
||||
}
|
||||
case TypeIndex::Decimal128:
|
||||
{
|
||||
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal128> *>(type.get());
|
||||
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename NumberType, typename NumberVectorBatch, typename ConvertFunc>
|
||||
void ORCBlockOutputFormat::writeNumbers(
|
||||
orc::ColumnVectorBatch * orc_column,
|
||||
const IColumn & column,
|
||||
const PaddedPODArray<UInt8> * null_bytemap,
|
||||
ConvertFunc convert)
|
||||
{
|
||||
NumberVectorBatch * number_orc_column = dynamic_cast<NumberVectorBatch *>(orc_column);
|
||||
const auto & number_column = assert_cast<const ColumnVector<NumberType> &>(column);
|
||||
number_orc_column->resize(number_column.size());
|
||||
|
||||
for (size_t i = 0; i != number_column.size(); ++i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[i])
|
||||
{
|
||||
number_orc_column->notNull[i] = 0;
|
||||
continue;
|
||||
}
|
||||
number_orc_column->data[i] = convert(number_column.getElement(i));
|
||||
}
|
||||
number_orc_column->numElements = number_column.size();
|
||||
}
|
||||
|
||||
template <typename Decimal, typename DecimalVectorBatch, typename ConvertFunc>
|
||||
void ORCBlockOutputFormat::writeDecimals(
|
||||
orc::ColumnVectorBatch * orc_column,
|
||||
const IColumn & column,
|
||||
DataTypePtr & type,
|
||||
const PaddedPODArray<UInt8> * null_bytemap,
|
||||
ConvertFunc convert)
|
||||
{
|
||||
DecimalVectorBatch *decimal_orc_column = dynamic_cast<DecimalVectorBatch *>(orc_column);
|
||||
const auto & decimal_column = assert_cast<const ColumnDecimal<Decimal> &>(column);
|
||||
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal> *>(type.get());
|
||||
decimal_orc_column->precision = decimal_type->getPrecision();
|
||||
decimal_orc_column->scale = decimal_type->getScale();
|
||||
decimal_orc_column->resize(decimal_column.size());
|
||||
for (size_t i = 0; i != decimal_column.size(); ++i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[i])
|
||||
{
|
||||
decimal_orc_column->notNull[i] = 0;
|
||||
continue;
|
||||
}
|
||||
decimal_orc_column->values[i] = convert(decimal_column.getElement(i).value);
|
||||
}
|
||||
decimal_orc_column->numElements = decimal_column.size();
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
void ORCBlockOutputFormat::writeStrings(
|
||||
orc::ColumnVectorBatch * orc_column,
|
||||
const IColumn & column,
|
||||
const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
orc::StringVectorBatch * string_orc_column = dynamic_cast<orc::StringVectorBatch *>(orc_column);
|
||||
const auto & string_column = assert_cast<const ColumnType &>(column);
|
||||
string_orc_column->resize(string_column.size());
|
||||
|
||||
for (size_t i = 0; i != string_column.size(); ++i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[i])
|
||||
{
|
||||
string_orc_column->notNull[i] = 0;
|
||||
continue;
|
||||
}
|
||||
const StringRef & string = string_column.getDataAt(i);
|
||||
string_orc_column->data[i] = const_cast<char *>(string.data);
|
||||
string_orc_column->length[i] = string.size;
|
||||
}
|
||||
string_orc_column->numElements = string_column.size();
|
||||
}
|
||||
|
||||
template <typename ColumnType, typename GetSecondsFunc, typename GetNanosecondsFunc>
|
||||
void ORCBlockOutputFormat::writeDateTimes(
|
||||
orc::ColumnVectorBatch * orc_column,
|
||||
const IColumn & column,
|
||||
const PaddedPODArray<UInt8> * null_bytemap,
|
||||
GetSecondsFunc get_seconds,
|
||||
GetNanosecondsFunc get_nanoseconds)
|
||||
{
|
||||
orc::TimestampVectorBatch * timestamp_orc_column = dynamic_cast<orc::TimestampVectorBatch *>(orc_column);
|
||||
const auto & timestamp_column = assert_cast<const ColumnType &>(column);
|
||||
timestamp_orc_column->resize(timestamp_column.size());
|
||||
|
||||
for (size_t i = 0; i != timestamp_column.size(); ++i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[i])
|
||||
{
|
||||
timestamp_orc_column->notNull[i] = 0;
|
||||
continue;
|
||||
}
|
||||
timestamp_orc_column->data[i] = get_seconds(timestamp_column.getElement(i));
|
||||
timestamp_orc_column->nanoseconds[i] = get_nanoseconds(timestamp_column.getElement(i));
|
||||
}
|
||||
timestamp_orc_column->numElements = timestamp_column.size();
|
||||
}
|
||||
|
||||
void ORCBlockOutputFormat::writeColumn(
|
||||
orc::ColumnVectorBatch * orc_column,
|
||||
const IColumn & column,
|
||||
DataTypePtr & type,
|
||||
const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
if (null_bytemap)
|
||||
{
|
||||
orc_column->hasNulls = true;
|
||||
orc_column->notNull.resize(column.size());
|
||||
}
|
||||
switch (type->getTypeId())
|
||||
{
|
||||
case TypeIndex::Int8:
|
||||
{
|
||||
/// Note: Explicit cast to avoid clang-tidy error: 'signed char' to 'long' conversion; consider casting to 'unsigned char' first.
|
||||
writeNumbers<Int8, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const Int8 & value){ return static_cast<int64_t>(value); });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::UInt8:
|
||||
{
|
||||
writeNumbers<UInt8, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const UInt8 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Int16:
|
||||
{
|
||||
writeNumbers<Int16, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const Int16 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Date: [[fallthrough]];
|
||||
case TypeIndex::UInt16:
|
||||
{
|
||||
writeNumbers<UInt16, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const UInt16 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Int32:
|
||||
{
|
||||
writeNumbers<Int32, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const Int32 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::UInt32:
|
||||
{
|
||||
writeNumbers<UInt32, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const UInt32 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Int64:
|
||||
{
|
||||
writeNumbers<Int64, orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const Int64 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::UInt64:
|
||||
{
|
||||
writeNumbers<UInt64,orc::LongVectorBatch>(orc_column, column, null_bytemap, [](const UInt64 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Float32:
|
||||
{
|
||||
writeNumbers<Float32, orc::DoubleVectorBatch>(orc_column, column, null_bytemap, [](const Float32 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Float64:
|
||||
{
|
||||
writeNumbers<Float64, orc::DoubleVectorBatch>(orc_column, column, null_bytemap, [](const Float64 & value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::FixedString:
|
||||
{
|
||||
writeStrings<ColumnFixedString>(orc_column, column, null_bytemap);
|
||||
break;
|
||||
}
|
||||
case TypeIndex::String:
|
||||
{
|
||||
writeStrings<ColumnString>(orc_column, column, null_bytemap);
|
||||
break;
|
||||
}
|
||||
case TypeIndex::DateTime:
|
||||
{
|
||||
writeDateTimes<ColumnUInt32>(
|
||||
orc_column,
|
||||
column, null_bytemap,
|
||||
[](UInt32 value){ return value; },
|
||||
[](UInt32){ return 0; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::DateTime64:
|
||||
{
|
||||
const auto * timestamp_type = assert_cast<const DataTypeDateTime64 *>(type.get());
|
||||
UInt32 scale = timestamp_type->getScale();
|
||||
writeDateTimes<DataTypeDateTime64::ColumnType>(
|
||||
orc_column,
|
||||
column, null_bytemap,
|
||||
[scale](UInt64 value){ return value / std::pow(10, scale); },
|
||||
[scale](UInt64 value){ return (value % UInt64(std::pow(10, scale))) * std::pow(10, 9 - scale); });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Decimal32:;
|
||||
{
|
||||
writeDecimals<Decimal32, orc::Decimal64VectorBatch>(
|
||||
orc_column,
|
||||
column,
|
||||
type,
|
||||
null_bytemap,
|
||||
[](Int32 value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Decimal64:
|
||||
{
|
||||
writeDecimals<Decimal64, orc::Decimal64VectorBatch>(
|
||||
orc_column,
|
||||
column,
|
||||
type,
|
||||
null_bytemap,
|
||||
[](Int64 value){ return value; });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Decimal128:
|
||||
{
|
||||
writeDecimals<Decimal128, orc::Decimal128VectorBatch>(
|
||||
orc_column,
|
||||
column,
|
||||
type,
|
||||
null_bytemap,
|
||||
[](Int128 value){ return orc::Int128(value >> 64, (value << 64) >> 64); });
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Nullable:
|
||||
{
|
||||
const auto & nullable_column = assert_cast<const ColumnNullable &>(column);
|
||||
const PaddedPODArray<UInt8> & new_null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*nullable_column.getNullMapColumnPtr()).getData();
|
||||
auto nested_type = removeNullable(type);
|
||||
writeColumn(orc_column, nullable_column.getNestedColumn(), nested_type, &new_null_bytemap);
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Array:
|
||||
{
|
||||
orc::ListVectorBatch * list_orc_column = dynamic_cast<orc::ListVectorBatch *>(orc_column);
|
||||
const auto & list_column = assert_cast<const ColumnArray &>(column);
|
||||
auto nested_type = assert_cast<const DataTypeArray &>(*type).getNestedType();
|
||||
const ColumnArray::Offsets & offsets = list_column.getOffsets();
|
||||
list_orc_column->resize(list_column.size());
|
||||
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
|
||||
list_orc_column->offsets[0] = 0;
|
||||
for (size_t i = 0; i != list_column.size(); ++i)
|
||||
{
|
||||
list_orc_column->offsets[i + 1] = offsets[i];
|
||||
}
|
||||
orc::ColumnVectorBatch * nested_orc_column = list_orc_column->elements.get();
|
||||
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap);
|
||||
list_orc_column->numElements = list_column.size();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
}
|
||||
|
||||
size_t ORCBlockOutputFormat::getColumnSize(const IColumn & column, DataTypePtr & type)
|
||||
{
|
||||
if (type->getTypeId() == TypeIndex::Array)
|
||||
{
|
||||
auto nested_type = assert_cast<const DataTypeArray &>(*type).getNestedType();
|
||||
const IColumn & nested_column = assert_cast<const ColumnArray &>(column).getData();
|
||||
return getColumnSize(nested_column, nested_type);
|
||||
}
|
||||
return column.size();
|
||||
}
|
||||
|
||||
size_t ORCBlockOutputFormat::getMaxColumnSize(Chunk & chunk)
|
||||
{
|
||||
size_t columns_num = chunk.getNumColumns();
|
||||
size_t max_column_size = 0;
|
||||
for (size_t i = 0; i != columns_num; ++i)
|
||||
{
|
||||
max_column_size = std::max(max_column_size, getColumnSize(*chunk.getColumns()[i], data_types[i]));
|
||||
}
|
||||
return max_column_size;
|
||||
}
|
||||
|
||||
void ORCBlockOutputFormat::consume(Chunk chunk)
|
||||
{
|
||||
size_t columns_num = chunk.getNumColumns();
|
||||
size_t rows_num = chunk.getNumRows();
|
||||
/// getMaxColumnSize is needed to write arrays.
|
||||
/// The size of the batch must be no less than total amount of array elements.
|
||||
ORC_UNIQUE_PTR<orc::ColumnVectorBatch> batch = writer->createRowBatch(getMaxColumnSize(chunk));
|
||||
orc::StructVectorBatch *root = dynamic_cast<orc::StructVectorBatch *>(batch.get());
|
||||
for (size_t i = 0; i != columns_num; ++i)
|
||||
{
|
||||
writeColumn(root->fields[i], *chunk.getColumns()[i], data_types[i], nullptr);
|
||||
}
|
||||
root->numElements = rows_num;
|
||||
writer->add(*batch);
|
||||
}
|
||||
|
||||
void ORCBlockOutputFormat::finalize()
|
||||
{
|
||||
writer->close();
|
||||
}
|
||||
|
||||
void registerOutputFormatProcessorORC(FormatFactory & factory)
|
||||
{
|
||||
factory.registerOutputFormatProcessor("ORC", [](
|
||||
WriteBuffer & buf,
|
||||
const Block & sample,
|
||||
FormatFactory::WriteCallback,
|
||||
const FormatSettings & format_settings)
|
||||
{
|
||||
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerOutputFormatProcessorORC(FormatFactory &)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
83
src/Processors/Formats/Impl/ORCBlockOutputFormat.h
Normal file
83
src/Processors/Formats/Impl/ORCBlockOutputFormat.h
Normal file
@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
#include "config_formats.h"
|
||||
#endif
|
||||
|
||||
#if USE_ORC
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <orc/OrcFile.hh>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class WriteBuffer;
|
||||
|
||||
/// orc::Writer writes only in orc::OutputStream
|
||||
class ORCOutputStream : public orc::OutputStream
|
||||
{
|
||||
public:
|
||||
ORCOutputStream(WriteBuffer & out_);
|
||||
|
||||
uint64_t getLength() const override;
|
||||
uint64_t getNaturalWriteSize() const override;
|
||||
void write(const void* buf, size_t length) override;
|
||||
|
||||
void close() override {}
|
||||
const std::string& getName() const override { return name; }
|
||||
|
||||
private:
|
||||
WriteBuffer & out;
|
||||
std::string name = "ORCOutputStream";
|
||||
};
|
||||
|
||||
class ORCBlockOutputFormat : public IOutputFormat
|
||||
{
|
||||
public:
|
||||
ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
|
||||
|
||||
String getName() const override { return "ORCBlockOutputFormat"; }
|
||||
void consume(Chunk chunk) override;
|
||||
void finalize() override;
|
||||
|
||||
private:
|
||||
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
|
||||
|
||||
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
|
||||
/// converted to unsigned char (bugprone-signed-char-misuse in clang).
|
||||
template <typename NumberType, typename NumberVectorBatch, typename ConvertFunc>
|
||||
void writeNumbers(orc::ColumnVectorBatch * orc_column, const IColumn & column, const PaddedPODArray<UInt8> * null_bytemap, ConvertFunc convert);
|
||||
|
||||
/// ConvertFunc is needed to convert ClickHouse Int128 to ORC Int128.
|
||||
template <typename Decimal, typename DecimalVectorBatch, typename ConvertFunc>
|
||||
void writeDecimals(orc::ColumnVectorBatch * orc_column, const IColumn & column, DataTypePtr & type,
|
||||
const PaddedPODArray<UInt8> * null_bytemap, ConvertFunc convert);
|
||||
|
||||
template <typename ColumnType>
|
||||
void writeStrings(orc::ColumnVectorBatch * orc_column, const IColumn & column, const PaddedPODArray<UInt8> * null_bytemap);
|
||||
|
||||
/// ORC column TimestampVectorBatch stores only seconds and nanoseconds,
|
||||
/// GetSecondsFunc and GetNanosecondsFunc are needed to extract them from DataTime type.
|
||||
template <typename ColumnType, typename GetSecondsFunc, typename GetNanosecondsFunc>
|
||||
void writeDateTimes(orc::ColumnVectorBatch * orc_column, const IColumn & column, const PaddedPODArray<UInt8> * null_bytemap,
|
||||
GetSecondsFunc get_seconds, GetNanosecondsFunc get_nanoseconds);
|
||||
|
||||
void writeColumn(orc::ColumnVectorBatch * orc_column, const IColumn & column, DataTypePtr & type, const PaddedPODArray<UInt8> * null_bytemap);
|
||||
|
||||
/// These two functions are needed to know maximum nested size of arrays to
|
||||
/// create an ORC Batch with the appropriate size
|
||||
size_t getColumnSize(const IColumn & column, DataTypePtr & type);
|
||||
size_t getMaxColumnSize(Chunk & chunk);
|
||||
|
||||
const FormatSettings format_settings;
|
||||
ORCOutputStream output_stream;
|
||||
DataTypes data_types;
|
||||
ORC_UNIQUE_PTR<orc::Writer> writer;
|
||||
ORC_UNIQUE_PTR<orc::Type> schema;
|
||||
orc::WriterOptions options;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
@ -35,6 +35,7 @@
|
||||
<value>ODBCDriver2</value>
|
||||
<value>Avro</value>
|
||||
<value>MsgPack</value>
|
||||
<value>ORC</value>
|
||||
</values>
|
||||
</substitution>
|
||||
</substitutions>
|
||||
|
@ -0,0 +1,6 @@
|
||||
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2020 2021-12-19 2021-12-19 03:00:00 1.0001 1.0000000100 100000.00000000000001000000 1
|
||||
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2000 2024-10-04 2028-04-21 01:20:00 34.1234 123123.1231231230 123123123.12312312312312300000 \N
|
||||
42 42 42 42 42 42 42 42 42.42 42.42 42 4242 1970-02-12 1970-01-01 03:00:42 42.4200 42.4242424200 424242.42424242424242000000 42
|
||||
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2020 2021-12-19 2021-12-19 03:00:00 1.0001 1.0000000100 100000.00000000000001000000 1
|
||||
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2000 2024-10-04 2028-04-21 01:20:00 34.1234 123123.1231231230 123123123.12312312312312300000 \N
|
||||
42 42 42 42 42 42 42 42 42.42 42.42 42 4242 1970-02-12 1970-01-01 03:00:42 42.4200 42.4242424200 424242.42424242424242000000 42
|
20
tests/queries/0_stateless/01307_orc_output_format.sh
Executable file
20
tests/queries/0_stateless/01307_orc_output_format.sh
Executable file
@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS orc";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE orc (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, fixed FixedString(4), date Date, datetime DateTime, decimal32 Decimal32(4), decimal64 Decimal64(10), decimal128 Decimal128(20), nullable Nullable(Int32)) ENGINE = Memory";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="INSERT INTO orc VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', '2020', 18980, 1639872000, 1.0001, 1.00000001, 100000.00000000000001, 1), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', '2000', 20000, 1839882000, 34.1234, 123123.123123123, 123123123.123123123123123, NULL), (42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', '4242', 42, 42, 42.42, 42.42424242, 424242.42424242424242, 42)";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT * FROM orc FORMAT ORC" > $CURDIR/tmp_orc_test_all_types.orc;
|
||||
|
||||
cat $CURDIR/tmp_orc_test_all_types.orc | $CLICKHOUSE_CLIENT --query="INSERT INTO orc FORMAT ORC";
|
||||
|
||||
rm $CURDIR/tmp_orc_test_all_types.orc
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT * FROM orc";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE orc";
|
Binary file not shown.
15
tests/queries/0_stateless/01308_orc_output_format_arrays.sh
Executable file
15
tests/queries/0_stateless/01308_orc_output_format_arrays.sh
Executable file
@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS orc";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE orc (array1 Array(Int32), array2 Array(Array(Int32))) ENGINE = Memory";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="INSERT INTO orc VALUES ([1,2,3,4,5], [[1,2], [3,4], [5]]), ([42], [[42, 42], [42]])";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT * FROM orc FORMAT ORC";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE orc";
|
||||
|
@ -57,6 +57,7 @@
|
||||
01044_h3_edge_angle
|
||||
01046_materialized_view_with_join_over_distributed
|
||||
01050_clickhouse_dict_source_with_subquery
|
||||
01053_ssd_dictionary
|
||||
01059_storage_file_brotli
|
||||
01070_h3_get_base_cell
|
||||
01070_h3_hex_area_m2
|
||||
@ -114,6 +115,7 @@
|
||||
01273_h3EdgeAngle_range_check
|
||||
01274_alter_rename_column_distributed
|
||||
01276_system_licenses
|
||||
01280_ssd_complex_key_dictionary
|
||||
01291_distributed_low_cardinality_memory_efficient
|
||||
01292_create_user
|
||||
01293_show_clusters
|
||||
@ -121,9 +123,9 @@
|
||||
01294_system_distributed_on_cluster
|
||||
01297_alter_distributed
|
||||
01303_aggregate_function_nothing_serde
|
||||
01307_orc_output_format
|
||||
01308_orc_output_format_arrays
|
||||
01319_query_formatting_in_server_log
|
||||
01326_build_id
|
||||
01053_ssd_dictionary
|
||||
01280_ssd_complex_key_dictionary
|
||||
01354_order_by_tuple_collate_const
|
||||
01370_client_autocomplete_word_break_characters
|
||||
|
Loading…
Reference in New Issue
Block a user