mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Support structs in Arrow/Parquet/ORC
This commit is contained in:
parent
be11ff0820
commit
235e3e2f5b
@ -7,15 +7,17 @@
|
|||||||
#include <DataTypes/DataTypesDecimal.h>
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeLowCardinality.h>
|
||||||
|
#include <DataTypes/DataTypeTuple.h>
|
||||||
#include <common/DateLUTImpl.h>
|
#include <common/DateLUTImpl.h>
|
||||||
#include <common/types.h>
|
#include <common/types.h>
|
||||||
#include <Core/Block.h>
|
#include <Core/Block.h>
|
||||||
#include <Columns/ColumnString.h>
|
#include <Columns/ColumnString.h>
|
||||||
#include <Columns/ColumnNullable.h>
|
#include <Columns/ColumnNullable.h>
|
||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
|
#include <Columns/ColumnTuple.h>
|
||||||
#include <Interpreters/castColumn.h>
|
#include <Interpreters/castColumn.h>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <DataTypes/DataTypeLowCardinality.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -309,8 +311,6 @@ namespace DB
|
|||||||
break;
|
break;
|
||||||
case arrow::Type::LIST:
|
case arrow::Type::LIST:
|
||||||
{
|
{
|
||||||
const auto * list_type = static_cast<arrow::ListType *>(arrow_column->type().get());
|
|
||||||
auto list_nested_type = list_type->value_type();
|
|
||||||
arrow::ArrayVector array_vector;
|
arrow::ArrayVector array_vector;
|
||||||
array_vector.reserve(arrow_column->num_chunks());
|
array_vector.reserve(arrow_column->num_chunks());
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||||
@ -326,6 +326,25 @@ namespace DB
|
|||||||
fillOffsetsFromArrowListColumn(arrow_column, column_array.getOffsetsColumn());
|
fillOffsetsFromArrowListColumn(arrow_column, column_array.getOffsetsColumn());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case arrow::Type::STRUCT:
|
||||||
|
{
|
||||||
|
ColumnTuple & column_tuple = typeid_cast<ColumnTuple &>(internal_column);
|
||||||
|
int fields_count = column_tuple.tupleSize();
|
||||||
|
std::vector<arrow::ArrayVector> nested_arrow_columns(fields_count);
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
arrow::StructArray & struct_chunk = static_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
|
||||||
|
for (int i = 0; i < fields_count; ++i)
|
||||||
|
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i != fields_count; ++i)
|
||||||
|
{
|
||||||
|
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
|
||||||
|
readColumnFromArrowColumn(nested_arrow_column, column_tuple.getColumn(i), column_name, format_name, false);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||||
case ARROW_NUMERIC_TYPE: \
|
case ARROW_NUMERIC_TYPE: \
|
||||||
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, internal_column); \
|
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, internal_column); \
|
||||||
@ -372,6 +391,29 @@ namespace DB
|
|||||||
return std::make_shared<DataTypeArray>(getInternalType(list_nested_type, array_type->getNestedType(), column_name, format_name));
|
return std::make_shared<DataTypeArray>(getInternalType(list_nested_type, array_type->getNestedType(), column_name, format_name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (arrow_type->id() == arrow::Type::STRUCT)
|
||||||
|
{
|
||||||
|
const auto * struct_type = static_cast<arrow::StructType *>(arrow_type.get());
|
||||||
|
const DataTypeTuple * tuple_type = typeid_cast<const DataTypeTuple *>(column_type.get());
|
||||||
|
if (!tuple_type)
|
||||||
|
throw Exception{"Cannot convert arrow STRUCT type to a not Tuple ClickHouse type " + column_type->getName(), ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||||
|
|
||||||
|
const DataTypes & tuple_nested_types = tuple_type->getElements();
|
||||||
|
int internal_fields_num = tuple_nested_types.size();
|
||||||
|
/// If internal column has less elements then arrow struct, we will select only first internal_fields_num columns.
|
||||||
|
if (internal_fields_num > struct_type->num_fields())
|
||||||
|
throw Exception{
|
||||||
|
"Cannot convert arrow STRUCT with " + std::to_string(struct_type->num_fields()) + " fields to a ClickHouse Tuple with "
|
||||||
|
+ std::to_string(internal_fields_num) + " elements " + column_type->getName(),
|
||||||
|
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||||
|
|
||||||
|
DataTypes nested_types;
|
||||||
|
for (int i = 0; i < internal_fields_num; ++i)
|
||||||
|
nested_types.push_back(getInternalType(struct_type->field(i)->type(), tuple_nested_types[i], column_name, format_name));
|
||||||
|
|
||||||
|
return std::make_shared<DataTypeTuple>(std::move(nested_types));
|
||||||
|
}
|
||||||
|
|
||||||
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(),
|
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(),
|
||||||
[=](auto && elem) { return elem.first == arrow_type->id(); });
|
[=](auto && elem) { return elem.first == arrow_type->id(); });
|
||||||
internal_type_it != arrow_type_to_internal_type.end())
|
internal_type_it != arrow_type_to_internal_type.end())
|
||||||
|
@ -6,11 +6,13 @@
|
|||||||
#include <Columns/ColumnNullable.h>
|
#include <Columns/ColumnNullable.h>
|
||||||
#include <Columns/ColumnString.h>
|
#include <Columns/ColumnString.h>
|
||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
|
#include <Columns/ColumnTuple.h>
|
||||||
#include <Core/callOnTypeIndex.h>
|
#include <Core/callOnTypeIndex.h>
|
||||||
#include <DataTypes/DataTypeDateTime.h>
|
#include <DataTypes/DataTypeDateTime.h>
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
#include <DataTypes/DataTypesDecimal.h>
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeTuple.h>
|
||||||
#include <Processors/Formats/IOutputFormat.h>
|
#include <Processors/Formats/IOutputFormat.h>
|
||||||
#include <arrow/api.h>
|
#include <arrow/api.h>
|
||||||
#include <arrow/builder.h>
|
#include <arrow/builder.h>
|
||||||
@ -113,7 +115,7 @@ namespace DB
|
|||||||
size_t start,
|
size_t start,
|
||||||
size_t end)
|
size_t end)
|
||||||
{
|
{
|
||||||
const auto * column_array = static_cast<const ColumnArray *>(column.get());
|
const auto * column_array = typeid_cast<const ColumnArray *>(column.get());
|
||||||
ColumnPtr nested_column = column_array->getDataPtr();
|
ColumnPtr nested_column = column_array->getDataPtr();
|
||||||
DataTypePtr nested_type = typeid_cast<const DataTypeArray *>(column_type.get())->getNestedType();
|
DataTypePtr nested_type = typeid_cast<const DataTypeArray *>(column_type.get())->getNestedType();
|
||||||
const auto & offsets = column_array->getOffsets();
|
const auto & offsets = column_array->getOffsets();
|
||||||
@ -124,13 +126,41 @@ namespace DB
|
|||||||
|
|
||||||
for (size_t array_idx = start; array_idx < end; ++array_idx)
|
for (size_t array_idx = start; array_idx < end; ++array_idx)
|
||||||
{
|
{
|
||||||
/// Start new array
|
/// Start new array.
|
||||||
components_status = builder.Append();
|
components_status = builder.Append();
|
||||||
checkStatus(components_status, nested_column->getName(), format_name);
|
checkStatus(components_status, nested_column->getName(), format_name);
|
||||||
fillArrowArray(column_name, nested_column, nested_type, null_bytemap, value_builder, format_name, offsets[array_idx - 1], offsets[array_idx]);
|
fillArrowArray(column_name, nested_column, nested_type, null_bytemap, value_builder, format_name, offsets[array_idx - 1], offsets[array_idx]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void fillArrowArrayWithTupleColumnData(
|
||||||
|
const String & column_name,
|
||||||
|
ColumnPtr & column,
|
||||||
|
const std::shared_ptr<const IDataType> & column_type,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
arrow::ArrayBuilder * array_builder,
|
||||||
|
String format_name,
|
||||||
|
size_t start,
|
||||||
|
size_t end)
|
||||||
|
{
|
||||||
|
const auto * column_tuple = typeid_cast<const ColumnTuple *>(column.get());
|
||||||
|
const auto & nested_types = typeid_cast<const DataTypeTuple *>(column_type.get())->getElements();
|
||||||
|
|
||||||
|
arrow::StructBuilder & builder = assert_cast<arrow::StructBuilder &>(*array_builder);
|
||||||
|
|
||||||
|
for (size_t i = 0; i != column_tuple->tupleSize(); ++i)
|
||||||
|
{
|
||||||
|
ColumnPtr nested_column = column_tuple->getColumnPtr(i);
|
||||||
|
fillArrowArray(column_name + "." + std::to_string(i), nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = start; i != end; ++i)
|
||||||
|
{
|
||||||
|
auto status = builder.Append();
|
||||||
|
checkStatus(status, column->getName(), format_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template <typename ColumnType>
|
template <typename ColumnType>
|
||||||
static void fillArrowArrayWithStringColumnData(
|
static void fillArrowArrayWithStringColumnData(
|
||||||
ColumnPtr write_column,
|
ColumnPtr write_column,
|
||||||
@ -251,6 +281,10 @@ namespace DB
|
|||||||
{
|
{
|
||||||
fillArrowArrayWithArrayColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end);
|
fillArrowArrayWithArrayColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end);
|
||||||
}
|
}
|
||||||
|
else if ("Tuple" == column_type_name)
|
||||||
|
{
|
||||||
|
fillArrowArrayWithTupleColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end);
|
||||||
|
}
|
||||||
else if (isDecimal(column_type))
|
else if (isDecimal(column_type))
|
||||||
{
|
{
|
||||||
auto fill_decimal = [&](const auto & types) -> bool
|
auto fill_decimal = [&](const auto & types) -> bool
|
||||||
@ -351,6 +385,19 @@ namespace DB
|
|||||||
return arrow::list(nested_arrow_type);
|
return arrow::list(nested_arrow_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isTuple(column_type))
|
||||||
|
{
|
||||||
|
const auto & nested_types = typeid_cast<const DataTypeTuple *>(column_type.get())->getElements();
|
||||||
|
std::vector<std::shared_ptr<arrow::Field>> nested_fields;
|
||||||
|
for (size_t i = 0; i != nested_types.size(); ++i)
|
||||||
|
{
|
||||||
|
String name = column_name + "." + std::to_string(i);
|
||||||
|
auto nested_arrow_type = getArrowType(nested_types[i], name, format_name, is_column_nullable);
|
||||||
|
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *is_column_nullable));
|
||||||
|
}
|
||||||
|
return arrow::struct_(std::move(nested_fields));
|
||||||
|
}
|
||||||
|
|
||||||
const std::string type_name = column_type->getFamilyName();
|
const std::string type_name = column_type->getFamilyName();
|
||||||
if (const auto * arrow_type_it = std::find_if(
|
if (const auto * arrow_type_it = std::find_if(
|
||||||
internal_type_to_arrow_type.begin(),
|
internal_type_to_arrow_type.begin(),
|
||||||
|
@ -67,7 +67,7 @@ void ORCBlockInputFormat::resetParser()
|
|||||||
stripe_current = 0;
|
stripe_current = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
|
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
|
||||||
{
|
{
|
||||||
if (type->id() == arrow::Type::LIST)
|
if (type->id() == arrow::Type::LIST)
|
||||||
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type()) + 1;
|
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type()) + 1;
|
||||||
|
@ -10,12 +10,14 @@
|
|||||||
#include <Columns/ColumnVector.h>
|
#include <Columns/ColumnVector.h>
|
||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
#include <Columns/ColumnString.h>
|
#include <Columns/ColumnString.h>
|
||||||
|
#include <Columns/ColumnTuple.h>
|
||||||
|
|
||||||
#include <DataTypes/DataTypeDateTime.h>
|
#include <DataTypes/DataTypeDateTime.h>
|
||||||
#include <DataTypes/DataTypeDateTime64.h>
|
#include <DataTypes/DataTypeDateTime64.h>
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
#include <DataTypes/DataTypesDecimal.h>
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeTuple.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -46,15 +48,9 @@ void ORCOutputStream::write(const void* buf, size_t length)
|
|||||||
ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||||
: IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_), data_types(header_.getDataTypes())
|
: IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_), data_types(header_.getDataTypes())
|
||||||
{
|
{
|
||||||
schema = orc::createStructType();
|
|
||||||
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
|
|
||||||
size_t columns_count = header_.columns();
|
|
||||||
for (size_t i = 0; i != columns_count; ++i)
|
|
||||||
schema->addStructField(header_.safeGetByPosition(i).name, getORCType(data_types[i]));
|
|
||||||
writer = orc::createWriter(*schema, &output_stream, options);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
|
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type, const std::string & column_name)
|
||||||
{
|
{
|
||||||
switch (type->getTypeId())
|
switch (type->getTypeId())
|
||||||
{
|
{
|
||||||
@ -102,12 +98,12 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
|
|||||||
}
|
}
|
||||||
case TypeIndex::Nullable:
|
case TypeIndex::Nullable:
|
||||||
{
|
{
|
||||||
return getORCType(removeNullable(type));
|
return getORCType(removeNullable(type), column_name);
|
||||||
}
|
}
|
||||||
case TypeIndex::Array:
|
case TypeIndex::Array:
|
||||||
{
|
{
|
||||||
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
|
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
|
||||||
return orc::createListType(getORCType(array_type->getNestedType()));
|
return orc::createListType(getORCType(array_type->getNestedType(), column_name));
|
||||||
}
|
}
|
||||||
case TypeIndex::Decimal32:
|
case TypeIndex::Decimal32:
|
||||||
{
|
{
|
||||||
@ -124,6 +120,18 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
|
|||||||
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal128> *>(type.get());
|
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal128> *>(type.get());
|
||||||
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
|
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
|
||||||
}
|
}
|
||||||
|
case TypeIndex::Tuple:
|
||||||
|
{
|
||||||
|
const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type.get());
|
||||||
|
const auto & nested_types = tuple_type->getElements();
|
||||||
|
auto struct_type = orc::createStructType();
|
||||||
|
for (size_t i = 0; i < nested_types.size(); ++i)
|
||||||
|
{
|
||||||
|
String name = column_name + "." + std::to_string(i);
|
||||||
|
struct_type->addStructField(name, getORCType(nested_types[i], name));
|
||||||
|
}
|
||||||
|
return struct_type;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
||||||
@ -149,6 +157,8 @@ void ORCBlockOutputFormat::writeNumbers(
|
|||||||
number_orc_column.notNull[i] = 0;
|
number_orc_column.notNull[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
number_orc_column.notNull[i] = 1;
|
||||||
number_orc_column.data[i] = convert(number_column.getElement(i));
|
number_orc_column.data[i] = convert(number_column.getElement(i));
|
||||||
}
|
}
|
||||||
number_orc_column.numElements = number_column.size();
|
number_orc_column.numElements = number_column.size();
|
||||||
@ -175,6 +185,9 @@ void ORCBlockOutputFormat::writeDecimals(
|
|||||||
decimal_orc_column.notNull[i] = 0;
|
decimal_orc_column.notNull[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
decimal_orc_column.notNull[i] = 1;
|
||||||
|
|
||||||
decimal_orc_column.values[i] = convert(decimal_column.getElement(i).value);
|
decimal_orc_column.values[i] = convert(decimal_column.getElement(i).value);
|
||||||
}
|
}
|
||||||
decimal_orc_column.numElements = decimal_column.size();
|
decimal_orc_column.numElements = decimal_column.size();
|
||||||
@ -197,6 +210,9 @@ void ORCBlockOutputFormat::writeStrings(
|
|||||||
string_orc_column.notNull[i] = 0;
|
string_orc_column.notNull[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
string_orc_column.notNull[i] = 1;
|
||||||
|
|
||||||
const StringRef & string = string_column.getDataAt(i);
|
const StringRef & string = string_column.getDataAt(i);
|
||||||
string_orc_column.data[i] = const_cast<char *>(string.data);
|
string_orc_column.data[i] = const_cast<char *>(string.data);
|
||||||
string_orc_column.length[i] = string.size;
|
string_orc_column.length[i] = string.size;
|
||||||
@ -223,6 +239,9 @@ void ORCBlockOutputFormat::writeDateTimes(
|
|||||||
timestamp_orc_column.notNull[i] = 0;
|
timestamp_orc_column.notNull[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
timestamp_orc_column.notNull[i] = 1;
|
||||||
|
|
||||||
timestamp_orc_column.data[i] = get_seconds(timestamp_column.getElement(i));
|
timestamp_orc_column.data[i] = get_seconds(timestamp_column.getElement(i));
|
||||||
timestamp_orc_column.nanoseconds[i] = get_nanoseconds(timestamp_column.getElement(i));
|
timestamp_orc_column.nanoseconds[i] = get_nanoseconds(timestamp_column.getElement(i));
|
||||||
}
|
}
|
||||||
@ -235,11 +254,10 @@ void ORCBlockOutputFormat::writeColumn(
|
|||||||
DataTypePtr & type,
|
DataTypePtr & type,
|
||||||
const PaddedPODArray<UInt8> * null_bytemap)
|
const PaddedPODArray<UInt8> * null_bytemap)
|
||||||
{
|
{
|
||||||
|
orc_column.notNull.resize(column.size());
|
||||||
if (null_bytemap)
|
if (null_bytemap)
|
||||||
{
|
|
||||||
orc_column.hasNulls = true;
|
orc_column.hasNulls = true;
|
||||||
orc_column.notNull.resize(column.size());
|
|
||||||
}
|
|
||||||
switch (type->getTypeId())
|
switch (type->getTypeId())
|
||||||
{
|
{
|
||||||
case TypeIndex::Int8:
|
case TypeIndex::Int8:
|
||||||
@ -374,12 +392,25 @@ void ORCBlockOutputFormat::writeColumn(
|
|||||||
for (size_t i = 0; i != list_column.size(); ++i)
|
for (size_t i = 0; i != list_column.size(); ++i)
|
||||||
{
|
{
|
||||||
list_orc_column.offsets[i + 1] = offsets[i];
|
list_orc_column.offsets[i + 1] = offsets[i];
|
||||||
|
list_orc_column.notNull[i] = 1;
|
||||||
}
|
}
|
||||||
orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements;
|
orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements;
|
||||||
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap);
|
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap);
|
||||||
list_orc_column.numElements = list_column.size();
|
list_orc_column.numElements = list_column.size();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case TypeIndex::Tuple:
|
||||||
|
{
|
||||||
|
orc::StructVectorBatch & struct_orc_column = dynamic_cast<orc::StructVectorBatch &>(orc_column);
|
||||||
|
const auto & tuple_column = assert_cast<const ColumnTuple &>(column);
|
||||||
|
auto nested_types = typeid_cast<const DataTypeTuple *>(type.get())->getElements();
|
||||||
|
for (size_t i = 0; i != tuple_column.size(); ++i)
|
||||||
|
struct_orc_column.notNull[i] = 1;
|
||||||
|
for (size_t i = 0; i != tuple_column.tupleSize(); ++i)
|
||||||
|
writeColumn(*struct_orc_column.fields[i], tuple_column.getColumn(i), nested_types[i], null_bytemap);
|
||||||
|
break;
|
||||||
|
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
|
||||||
}
|
}
|
||||||
@ -409,6 +440,8 @@ size_t ORCBlockOutputFormat::getMaxColumnSize(Chunk & chunk)
|
|||||||
|
|
||||||
void ORCBlockOutputFormat::consume(Chunk chunk)
|
void ORCBlockOutputFormat::consume(Chunk chunk)
|
||||||
{
|
{
|
||||||
|
if (!writer)
|
||||||
|
prepareWriter();
|
||||||
size_t columns_num = chunk.getNumColumns();
|
size_t columns_num = chunk.getNumColumns();
|
||||||
size_t rows_num = chunk.getNumRows();
|
size_t rows_num = chunk.getNumRows();
|
||||||
/// getMaxColumnSize is needed to write arrays.
|
/// getMaxColumnSize is needed to write arrays.
|
||||||
@ -425,9 +458,23 @@ void ORCBlockOutputFormat::consume(Chunk chunk)
|
|||||||
|
|
||||||
void ORCBlockOutputFormat::finalize()
|
void ORCBlockOutputFormat::finalize()
|
||||||
{
|
{
|
||||||
|
if (!writer)
|
||||||
|
prepareWriter();
|
||||||
|
|
||||||
writer->close();
|
writer->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ORCBlockOutputFormat::prepareWriter()
|
||||||
|
{
|
||||||
|
const Block & header = getPort(PortKind::Main).getHeader();
|
||||||
|
schema = orc::createStructType();
|
||||||
|
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
|
||||||
|
size_t columns_count = header.columns();
|
||||||
|
for (size_t i = 0; i != columns_count; ++i)
|
||||||
|
schema->addStructField(header.safeGetByPosition(i).name, getORCType(data_types[i], header.safeGetByPosition(i).name));
|
||||||
|
writer = orc::createWriter(*schema, &output_stream, options);
|
||||||
|
}
|
||||||
|
|
||||||
void registerOutputFormatProcessorORC(FormatFactory & factory)
|
void registerOutputFormatProcessorORC(FormatFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerOutputFormatProcessor("ORC", [](
|
factory.registerOutputFormatProcessor("ORC", [](
|
||||||
|
@ -43,7 +43,7 @@ public:
|
|||||||
void finalize() override;
|
void finalize() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
|
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type, const std::string & column_name);
|
||||||
|
|
||||||
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
|
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
|
||||||
/// converted to unsigned char (bugprone-signed-char-misuse in clang).
|
/// converted to unsigned char (bugprone-signed-char-misuse in clang).
|
||||||
@ -71,6 +71,8 @@ private:
|
|||||||
size_t getColumnSize(const IColumn & column, DataTypePtr & type);
|
size_t getColumnSize(const IColumn & column, DataTypePtr & type);
|
||||||
size_t getMaxColumnSize(Chunk & chunk);
|
size_t getMaxColumnSize(Chunk & chunk);
|
||||||
|
|
||||||
|
void prepareWriter();
|
||||||
|
|
||||||
const FormatSettings format_settings;
|
const FormatSettings format_settings;
|
||||||
ORCOutputStream output_stream;
|
ORCOutputStream output_stream;
|
||||||
DataTypes data_types;
|
DataTypes data_types;
|
||||||
|
@ -67,6 +67,24 @@ void ParquetBlockInputFormat::resetParser()
|
|||||||
row_group_current = 0;
|
row_group_current = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
|
||||||
|
{
|
||||||
|
if (type->id() == arrow::Type::LIST)
|
||||||
|
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type());
|
||||||
|
|
||||||
|
int indices = 0;
|
||||||
|
if (type->id() == arrow::Type::STRUCT)
|
||||||
|
{
|
||||||
|
auto * struct_type = static_cast<arrow::StructType *>(type.get());
|
||||||
|
for (int i = 0; i != struct_type->num_fields(); ++i)
|
||||||
|
indices += countIndicesForType(struct_type->field(i)->type());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
indices = 1;
|
||||||
|
|
||||||
|
return indices;
|
||||||
|
}
|
||||||
|
|
||||||
void ParquetBlockInputFormat::prepareReader()
|
void ParquetBlockInputFormat::prepareReader()
|
||||||
{
|
{
|
||||||
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
|
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
|
||||||
@ -76,11 +94,14 @@ void ParquetBlockInputFormat::prepareReader()
|
|||||||
std::shared_ptr<arrow::Schema> schema;
|
std::shared_ptr<arrow::Schema> schema;
|
||||||
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
||||||
|
|
||||||
|
int index = 0;
|
||||||
for (int i = 0; i < schema->num_fields(); ++i)
|
for (int i = 0; i < schema->num_fields(); ++i)
|
||||||
{
|
{
|
||||||
if (getPort().getHeader().has(schema->field(i)->name()))
|
if (getPort().getHeader().has(schema->field(i)->name()))
|
||||||
{
|
{
|
||||||
column_indices.push_back(i);
|
int indexes_count = countIndicesForType(schema->field(i)->type());
|
||||||
|
for (int j = 0; j != indexes_count; ++j)
|
||||||
|
column_indices.push_back(index++);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,2 @@
|
|||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
17
tests/queries/0_stateless/00900_orc_tuples_load.sh
Executable file
17
tests/queries/0_stateless/00900_orc_tuples_load.sh
Executable file
@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS orc_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE orc_tuples (t1 Tuple(UInt32, UInt32), t2 Tuple(String, String), t3 Tuple(Tuple(UInt32, String), UInt32), t4 Tuple(Tuple(UInt32, UInt32), Tuple(String, String)), t5 Tuple(Array(UInt32), UInt32), t6 Tuple(Tuple(Array(UInt32), Array(UInt32)), Tuple(Array(Array(UInt32)), UInt32)), t7 Array(Tuple(Array(Array(UInt32)), Tuple(Array(Tuple(UInt32, UInt32)), UInt32)))) ENGINE=Memory()"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO orc_tuples VALUES ((1, 2), ('1', '2'), ((1, '1'), 1), ((1, 2), ('1', '2')), ([1,2,3], 1), (([1,2,3], [1,2,3]), ([[1,2,3], [1,2,3]], 1)), [([[1,2,3], [1,2,3]], ([(1, 2), (1, 2)], 1))])"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM orc_tuples FORMAT ORC" > "${CLICKHOUSE_TMP}"/tuples.orc
|
||||||
|
|
||||||
|
cat "${CLICKHOUSE_TMP}"/tuples.orc | ${CLICKHOUSE_CLIENT} -q "INSERT INTO orc_tuples FORMAT ORC"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM orc_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE orc_tuples"
|
@ -0,0 +1,2 @@
|
|||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
17
tests/queries/0_stateless/00900_parquet_tuples_load.sh
Executable file
17
tests/queries/0_stateless/00900_parquet_tuples_load.sh
Executable file
@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_tuples (t1 Tuple(UInt32, UInt32), t2 Tuple(String, String), t3 Tuple(Tuple(UInt32, String), UInt32), t4 Tuple(Tuple(UInt32, UInt32), Tuple(String, String)), t5 Tuple(Array(UInt32), UInt32), t6 Tuple(Tuple(Array(UInt32), Array(UInt32)), Tuple(Array(Array(UInt32)), UInt32)), t7 Array(Tuple(Array(Array(UInt32)), Tuple(Array(Tuple(UInt32, UInt32)), UInt32)))) ENGINE=Memory()"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO parquet_tuples VALUES ((1, 2), ('1', '2'), ((1, '1'), 1), ((1, 2), ('1', '2')), ([1,2,3], 1), (([1,2,3], [1,2,3]), ([[1,2,3], [1,2,3]], 1)), [([[1,2,3], [1,2,3]], ([(1, 2), (1, 2)], 1))])"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_tuples FORMAT Parquet" > "${CLICKHOUSE_TMP}"/tuples.parquet
|
||||||
|
|
||||||
|
cat "${CLICKHOUSE_TMP}"/tuples.parquet | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_tuples FORMAT Parquet"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE parquet_tuples"
|
@ -4,7 +4,7 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
# shellcheck source=../shell_config.sh
|
# shellcheck source=../shell_config.sh
|
||||||
. "$CUR_DIR"/../shell_config.sh
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS orc_arrays"
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_arrays"
|
||||||
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_arrays (arr1 Array(Int8), arr2 Array(UInt8), arr3 Array(Int16), arr4 Array(UInt16), arr5 Array(Int32), arr6 Array(UInt32), arr7 Array(Int64), arr8 Array(UInt64), arr9 Array(String), arr10 Array(FixedString(4)), arr11 Array(Float32), arr12 Array(Float64), arr13 Array(Date), arr14 Array(Datetime)) ENGINE=Memory()"
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_arrays (arr1 Array(Int8), arr2 Array(UInt8), arr3 Array(Int16), arr4 Array(UInt16), arr5 Array(Int32), arr6 Array(UInt32), arr7 Array(Int64), arr8 Array(UInt64), arr9 Array(String), arr10 Array(FixedString(4)), arr11 Array(Float32), arr12 Array(Float64), arr13 Array(Date), arr14 Array(Datetime)) ENGINE=Memory()"
|
||||||
|
|
||||||
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_arrays VALUES ([1,-2,3],[1,2,3],[100,-200,300],[100,200,300],[10000000,-20000000,30000000],[10000000,2000000,3000000],[100000000000000,-200000000000,3000000000000],[100000000000000,20000000000000,3000000000000],['Some string','Some string','Some string'],['0000','1111','2222'],[42.42,424.2,0.4242],[424242.424242,4242042420.242424,42],['2000-01-01','2001-01-01','2002-01-01'],['2000-01-01 00:00:00','2001-01-01 00:00:00','2002-01-01 00:00:00']),([],[],[],[],[],[],[],[],[],[],[],[],[],[])"
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_arrays VALUES ([1,-2,3],[1,2,3],[100,-200,300],[100,200,300],[10000000,-20000000,30000000],[10000000,2000000,3000000],[100000000000000,-200000000000,3000000000000],[100000000000000,20000000000000,3000000000000],['Some string','Some string','Some string'],['0000','1111','2222'],[42.42,424.2,0.4242],[424242.424242,4242042420.242424,42],['2000-01-01','2001-01-01','2002-01-01'],['2000-01-01 00:00:00','2001-01-01 00:00:00','2002-01-01 00:00:00']),([],[],[],[],[],[],[],[],[],[],[],[],[],[])"
|
||||||
|
@ -0,0 +1,2 @@
|
|||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
||||||
|
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
|
17
tests/queries/0_stateless/01273_arrow_tuples_load.sh
Executable file
17
tests/queries/0_stateless/01273_arrow_tuples_load.sh
Executable file
@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_tuples (t1 Tuple(UInt32, UInt32), t2 Tuple(String, String), t3 Tuple(Tuple(UInt32, String), UInt32), t4 Tuple(Tuple(UInt32, UInt32), Tuple(String, String)), t5 Tuple(Array(UInt32), UInt32), t6 Tuple(Tuple(Array(UInt32), Array(UInt32)), Tuple(Array(Array(UInt32)), UInt32)), t7 Array(Tuple(Array(Array(UInt32)), Tuple(Array(Tuple(UInt32, UInt32)), UInt32)))) ENGINE=Memory()"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_tuples VALUES ((1, 2), ('1', '2'), ((1, '1'), 1), ((1, 2), ('1', '2')), ([1,2,3], 1), (([1,2,3], [1,2,3]), ([[1,2,3], [1,2,3]], 1)), [([[1,2,3], [1,2,3]], ([(1, 2), (1, 2)], 1))])"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_tuples FORMAT Arrow" > "${CLICKHOUSE_TMP}"/tuples.arrow
|
||||||
|
|
||||||
|
cat "${CLICKHOUSE_TMP}"/tuples.arrow | ${CLICKHOUSE_CLIENT} -q "INSERT INTO arrow_tuples FORMAT Arrow"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_tuples"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_tuples"
|
Loading…
Reference in New Issue
Block a user