Small refactors in ORC output format

This commit is contained in:
taiyang-li 2024-08-06 13:23:12 +08:00
parent c629e2af50
commit e6f566e49d

View File

@ -12,6 +12,7 @@
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h> #include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h> #include <Columns/ColumnMap.h>
#include <Columns/ColumnsCommon.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
@ -203,25 +204,15 @@ template <typename NumberType, typename NumberVectorBatch, typename ConvertFunc>
void ORCBlockOutputFormat::writeNumbers( void ORCBlockOutputFormat::writeNumbers(
orc::ColumnVectorBatch & orc_column, orc::ColumnVectorBatch & orc_column,
const IColumn & column, const IColumn & column,
const PaddedPODArray<UInt8> * null_bytemap, const PaddedPODArray<UInt8> * /*null_bytemap*/,
ConvertFunc convert) ConvertFunc convert)
{ {
NumberVectorBatch & number_orc_column = dynamic_cast<NumberVectorBatch &>(orc_column); NumberVectorBatch & number_orc_column = dynamic_cast<NumberVectorBatch &>(orc_column);
const auto & number_column = assert_cast<const ColumnVector<NumberType> &>(column); const auto & number_column = assert_cast<const ColumnVector<NumberType> &>(column);
number_orc_column.resize(number_column.size());
number_orc_column.data.resize(number_column.size());
for (size_t i = 0; i != number_column.size(); ++i) for (size_t i = 0; i != number_column.size(); ++i)
{
if (null_bytemap && (*null_bytemap)[i])
{
number_orc_column.notNull[i] = 0;
continue;
}
number_orc_column.notNull[i] = 1;
number_orc_column.data[i] = convert(number_column.getElement(i)); number_orc_column.data[i] = convert(number_column.getElement(i));
}
number_orc_column.numElements = number_column.size();
} }
template <typename Decimal, typename DecimalVectorBatch, typename ConvertFunc> template <typename Decimal, typename DecimalVectorBatch, typename ConvertFunc>
@ -229,7 +220,7 @@ void ORCBlockOutputFormat::writeDecimals(
orc::ColumnVectorBatch & orc_column, orc::ColumnVectorBatch & orc_column,
const IColumn & column, const IColumn & column,
DataTypePtr & type, DataTypePtr & type,
const PaddedPODArray<UInt8> * null_bytemap, const PaddedPODArray<UInt8> * /*null_bytemap*/,
ConvertFunc convert) ConvertFunc convert)
{ {
DecimalVectorBatch & decimal_orc_column = dynamic_cast<DecimalVectorBatch &>(orc_column); DecimalVectorBatch & decimal_orc_column = dynamic_cast<DecimalVectorBatch &>(orc_column);
@ -238,71 +229,49 @@ void ORCBlockOutputFormat::writeDecimals(
decimal_orc_column.precision = decimal_type->getPrecision(); decimal_orc_column.precision = decimal_type->getPrecision();
decimal_orc_column.scale = decimal_type->getScale(); decimal_orc_column.scale = decimal_type->getScale();
decimal_orc_column.resize(decimal_column.size()); decimal_orc_column.resize(decimal_column.size());
for (size_t i = 0; i != decimal_column.size(); ++i)
{
if (null_bytemap && (*null_bytemap)[i])
{
decimal_orc_column.notNull[i] = 0;
continue;
}
decimal_orc_column.notNull[i] = 1; decimal_orc_column.values.resize(decimal_column.size());
for (size_t i = 0; i != decimal_column.size(); ++i)
decimal_orc_column.values[i] = convert(decimal_column.getElement(i).value); decimal_orc_column.values[i] = convert(decimal_column.getElement(i).value);
}
decimal_orc_column.numElements = decimal_column.size();
} }
template <typename ColumnType> template <typename ColumnType>
void ORCBlockOutputFormat::writeStrings( void ORCBlockOutputFormat::writeStrings(
orc::ColumnVectorBatch & orc_column, orc::ColumnVectorBatch & orc_column,
const IColumn & column, const IColumn & column,
const PaddedPODArray<UInt8> * null_bytemap) const PaddedPODArray<UInt8> * /*null_bytemap*/)
{ {
orc::StringVectorBatch & string_orc_column = dynamic_cast<orc::StringVectorBatch &>(orc_column); orc::StringVectorBatch & string_orc_column = dynamic_cast<orc::StringVectorBatch &>(orc_column);
const auto & string_column = assert_cast<const ColumnType &>(column); const auto & string_column = assert_cast<const ColumnType &>(column);
string_orc_column.resize(string_column.size());
string_orc_column.data.resize(string_column.size());
string_orc_column.length.resize(string_column.size());
for (size_t i = 0; i != string_column.size(); ++i) for (size_t i = 0; i != string_column.size(); ++i)
{ {
if (null_bytemap && (*null_bytemap)[i])
{
string_orc_column.notNull[i] = 0;
continue;
}
string_orc_column.notNull[i] = 1;
const std::string_view & string = string_column.getDataAt(i).toView(); const std::string_view & string = string_column.getDataAt(i).toView();
string_orc_column.data[i] = const_cast<char *>(string.data()); string_orc_column.data[i] = const_cast<char *>(string.data());
string_orc_column.length[i] = string.size(); string_orc_column.length[i] = string.size();
} }
string_orc_column.numElements = string_column.size();
} }
template <typename ColumnType, typename GetSecondsFunc, typename GetNanosecondsFunc> template <typename ColumnType, typename GetSecondsFunc, typename GetNanosecondsFunc>
void ORCBlockOutputFormat::writeDateTimes( void ORCBlockOutputFormat::writeDateTimes(
orc::ColumnVectorBatch & orc_column, orc::ColumnVectorBatch & orc_column,
const IColumn & column, const IColumn & column,
const PaddedPODArray<UInt8> * null_bytemap, const PaddedPODArray<UInt8> * /*null_bytemap*/,
GetSecondsFunc get_seconds, GetSecondsFunc get_seconds,
GetNanosecondsFunc get_nanoseconds) GetNanosecondsFunc get_nanoseconds)
{ {
orc::TimestampVectorBatch & timestamp_orc_column = dynamic_cast<orc::TimestampVectorBatch &>(orc_column); orc::TimestampVectorBatch & timestamp_orc_column = dynamic_cast<orc::TimestampVectorBatch &>(orc_column);
const auto & timestamp_column = assert_cast<const ColumnType &>(column); const auto & timestamp_column = assert_cast<const ColumnType &>(column);
timestamp_orc_column.resize(timestamp_column.size());
timestamp_orc_column.data.resize(timestamp_column.size());
timestamp_orc_column.nanoseconds.resize(timestamp_column.size());
for (size_t i = 0; i != timestamp_column.size(); ++i) for (size_t i = 0; i != timestamp_column.size(); ++i)
{ {
if (null_bytemap && (*null_bytemap)[i])
{
timestamp_orc_column.notNull[i] = 0;
continue;
}
timestamp_orc_column.notNull[i] = 1;
timestamp_orc_column.data[i] = static_cast<int64_t>(get_seconds(timestamp_column.getElement(i))); timestamp_orc_column.data[i] = static_cast<int64_t>(get_seconds(timestamp_column.getElement(i)));
timestamp_orc_column.nanoseconds[i] = static_cast<int64_t>(get_nanoseconds(timestamp_column.getElement(i))); timestamp_orc_column.nanoseconds[i] = static_cast<int64_t>(get_nanoseconds(timestamp_column.getElement(i)));
} }
timestamp_orc_column.numElements = timestamp_column.size();
} }
void ORCBlockOutputFormat::writeColumn( void ORCBlockOutputFormat::writeColumn(
@ -311,9 +280,19 @@ void ORCBlockOutputFormat::writeColumn(
DataTypePtr & type, DataTypePtr & type,
const PaddedPODArray<UInt8> * null_bytemap) const PaddedPODArray<UInt8> * null_bytemap)
{ {
orc_column.notNull.resize(column.size()); orc_column.numElements = column.size();
if (null_bytemap) if (null_bytemap)
orc_column.hasNulls = true; {
orc_column.hasNulls = !memoryIsZero(null_bytemap->data(), 0, null_bytemap->size());
if (orc_column.hasNulls)
{
orc_column.notNull.resize(null_bytemap->size());
for (size_t i = 0; i < null_bytemap->size(); ++i)
orc_column.notNull[i] = !(*null_bytemap)[i];
}
}
else
orc_column.hasNulls = false;
/// ORC doesn't have unsigned types, so cast everything to signed and sign-extend to Int64 to /// ORC doesn't have unsigned types, so cast everything to signed and sign-extend to Int64 to
/// make the ORC library calculate min and max correctly. /// make the ORC library calculate min and max correctly.
@ -471,6 +450,7 @@ void ORCBlockOutputFormat::writeColumn(
} }
case TypeIndex::Nullable: case TypeIndex::Nullable:
{ {
chassert(!null_bytemap);
const auto & nullable_column = assert_cast<const ColumnNullable &>(column); const auto & nullable_column = assert_cast<const ColumnNullable &>(column);
const PaddedPODArray<UInt8> & new_null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*nullable_column.getNullMapColumnPtr()).getData(); const PaddedPODArray<UInt8> & new_null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*nullable_column.getNullMapColumnPtr()).getData();
auto nested_type = removeNullable(type); auto nested_type = removeNullable(type);
@ -485,19 +465,15 @@ void ORCBlockOutputFormat::writeColumn(
const ColumnArray::Offsets & offsets = list_column.getOffsets(); const ColumnArray::Offsets & offsets = list_column.getOffsets();
size_t column_size = list_column.size(); size_t column_size = list_column.size();
list_orc_column.resize(column_size); list_orc_column.offsets.resize(column_size + 1);
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i]. /// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
list_orc_column.offsets[0] = 0; list_orc_column.offsets[0] = 0;
for (size_t i = 0; i != column_size; ++i) for (size_t i = 0; i != column_size; ++i)
{
list_orc_column.offsets[i + 1] = offsets[i]; list_orc_column.offsets[i + 1] = offsets[i];
list_orc_column.notNull[i] = 1;
}
orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements; orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements;
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap); writeColumn(nested_orc_column, list_column.getData(), nested_type, nullptr);
list_orc_column.numElements = column_size;
break; break;
} }
case TypeIndex::Tuple: case TypeIndex::Tuple:
@ -505,10 +481,8 @@ void ORCBlockOutputFormat::writeColumn(
orc::StructVectorBatch & struct_orc_column = dynamic_cast<orc::StructVectorBatch &>(orc_column); orc::StructVectorBatch & struct_orc_column = dynamic_cast<orc::StructVectorBatch &>(orc_column);
const auto & tuple_column = assert_cast<const ColumnTuple &>(column); const auto & tuple_column = assert_cast<const ColumnTuple &>(column);
auto nested_types = assert_cast<const DataTypeTuple *>(type.get())->getElements(); auto nested_types = assert_cast<const DataTypeTuple *>(type.get())->getElements();
for (size_t i = 0; i != tuple_column.size(); ++i)
struct_orc_column.notNull[i] = 1;
for (size_t i = 0; i != tuple_column.tupleSize(); ++i) for (size_t i = 0; i != tuple_column.tupleSize(); ++i)
writeColumn(*struct_orc_column.fields[i], tuple_column.getColumn(i), nested_types[i], null_bytemap); writeColumn(*struct_orc_column.fields[i], tuple_column.getColumn(i), nested_types[i], nullptr);
break; break;
} }
case TypeIndex::Map: case TypeIndex::Map:
@ -520,25 +494,21 @@ void ORCBlockOutputFormat::writeColumn(
size_t column_size = list_column.size(); size_t column_size = list_column.size();
map_orc_column.resize(list_column.size()); map_orc_column.offsets.resize(column_size + 1);
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i]. /// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
map_orc_column.offsets[0] = 0; map_orc_column.offsets[0] = 0;
for (size_t i = 0; i != column_size; ++i) for (size_t i = 0; i != column_size; ++i)
{
map_orc_column.offsets[i + 1] = offsets[i]; map_orc_column.offsets[i + 1] = offsets[i];
map_orc_column.notNull[i] = 1;
}
const auto nested_columns = assert_cast<const ColumnTuple *>(list_column.getDataPtr().get())->getColumns(); const auto nested_columns = assert_cast<const ColumnTuple *>(list_column.getDataPtr().get())->getColumns();
orc::ColumnVectorBatch & keys_orc_column = *map_orc_column.keys; orc::ColumnVectorBatch & keys_orc_column = *map_orc_column.keys;
auto key_type = map_type.getKeyType(); auto key_type = map_type.getKeyType();
writeColumn(keys_orc_column, *nested_columns[0], key_type, null_bytemap); writeColumn(keys_orc_column, *nested_columns[0], key_type, nullptr);
orc::ColumnVectorBatch & values_orc_column = *map_orc_column.elements; orc::ColumnVectorBatch & values_orc_column = *map_orc_column.elements;
auto value_type = map_type.getValueType(); auto value_type = map_type.getValueType();
writeColumn(values_orc_column, *nested_columns[1], value_type, null_bytemap); writeColumn(values_orc_column, *nested_columns[1], value_type, nullptr);
map_orc_column.numElements = column_size;
break; break;
} }
default: default:
@ -575,10 +545,7 @@ void ORCBlockOutputFormat::consume(Chunk chunk)
size_t columns_num = chunk.getNumColumns(); size_t columns_num = chunk.getNumColumns();
size_t rows_num = chunk.getNumRows(); size_t rows_num = chunk.getNumRows();
/// getMaxColumnSize is needed to write arrays. std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(chunk.getNumRows());
/// The size of the batch must be no less than total amount of array elements
/// and no less than the number of rows (ORC writes a null bit for every row).
std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(getMaxColumnSize(chunk));
orc::StructVectorBatch & root = dynamic_cast<orc::StructVectorBatch &>(*batch); orc::StructVectorBatch & root = dynamic_cast<orc::StructVectorBatch &>(*batch);
auto columns = chunk.detachColumns(); auto columns = chunk.detachColumns();