mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Resolve conflicts, fix build and tests
This commit is contained in:
parent
ee167e9ee8
commit
607d4dcc0b
@ -2,6 +2,7 @@
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
|
||||
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
@ -10,7 +11,8 @@
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeDate32.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <common/DateLUTImpl.h>
|
||||
#include <common/types.h>
|
||||
@ -23,17 +25,41 @@
|
||||
#include <Columns/ColumnUnique.h>
|
||||
#include <Columns/ColumnMap.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Processors/Chunk.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <algorithm>
|
||||
#include <fmt/format.h>
|
||||
#include <arrow/builder.h>
|
||||
#include <arrow/array.h>
|
||||
|
||||
|
||||
#define FOR_ARROW_NUMERIC_TYPES(M) \
|
||||
M(arrow::Type::UINT8, DB::UInt8) \
|
||||
M(arrow::Type::INT8, DB::Int8) \
|
||||
M(arrow::Type::UINT16, DB::UInt16) \
|
||||
M(arrow::Type::INT16, DB::Int16) \
|
||||
M(arrow::Type::UINT32, DB::UInt32) \
|
||||
M(arrow::Type::INT32, DB::Int32) \
|
||||
M(arrow::Type::UINT64, DB::UInt64) \
|
||||
M(arrow::Type::INT64, DB::Int64) \
|
||||
M(arrow::Type::HALF_FLOAT, DB::Float32) \
|
||||
M(arrow::Type::FLOAT, DB::Float32) \
|
||||
M(arrow::Type::DOUBLE, DB::Float64)
|
||||
|
||||
#define FOR_ARROW_INDEXES_TYPES(M) \
|
||||
M(arrow::Type::UINT8, DB::UInt8) \
|
||||
M(arrow::Type::INT8, DB::UInt8) \
|
||||
M(arrow::Type::UINT16, DB::UInt16) \
|
||||
M(arrow::Type::INT16, DB::UInt16) \
|
||||
M(arrow::Type::UINT32, DB::UInt32) \
|
||||
M(arrow::Type::INT32, DB::UInt32) \
|
||||
M(arrow::Type::UINT64, DB::UInt64) \
|
||||
M(arrow::Type::INT64, DB::UInt64)
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_TYPE;
|
||||
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
||||
extern const int CANNOT_CONVERT_TYPE;
|
||||
@ -41,18 +67,13 @@ namespace DB
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
}
|
||||
}
|
||||
|
||||
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
|
||||
{
|
||||
if (!status.ok())
|
||||
throw Exception{fmt::format("Error with a {} column \"{}\": {}.", format_name, column_name, status.ToString()), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||
}
|
||||
|
||||
/// Inserts numeric data right into internal column data to reduce an overhead
|
||||
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
||||
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
/// Inserts numeric data right into internal column data to reduce an overhead
|
||||
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
||||
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeNumber<NumericType>>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
|
||||
@ -68,13 +89,13 @@ namespace DB
|
||||
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts chars and offsets right into internal column data to reduce an overhead.
|
||||
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
|
||||
/// Also internal strings are null terminated.
|
||||
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
/// Inserts chars and offsets right into internal column data to reduce an overhead.
|
||||
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
|
||||
/// Also internal strings are null terminated.
|
||||
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeString>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*internal_column).getChars();
|
||||
@ -83,7 +104,7 @@ namespace DB
|
||||
size_t chars_t_size = 0;
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::BinaryArray & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
if (chunk_length > 0)
|
||||
@ -98,7 +119,7 @@ namespace DB
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::BinaryArray & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
@ -115,10 +136,10 @@ namespace DB
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeUInt8>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt8> &>(*internal_column).getData();
|
||||
@ -126,7 +147,7 @@ namespace DB
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::BooleanArray & chunk = dynamic_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
|
||||
|
||||
@ -134,18 +155,18 @@ namespace DB
|
||||
column_data.emplace_back(chunk.Value(bool_i));
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeUInt16>();
|
||||
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeDate32>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(*internal_column).getData();
|
||||
PaddedPODArray<Int32> & column_data = assert_cast<ColumnVector<Int32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::Date32Array & chunk = dynamic_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
|
||||
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
@ -155,8 +176,8 @@ namespace DB
|
||||
// TODO: will it rollback correctly?
|
||||
throw Exception
|
||||
{
|
||||
fmt::format("Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, column_name, DATE_LUT_MAX_DAY_NUM),
|
||||
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE
|
||||
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
|
||||
"Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, column_name, DATE_LUT_MAX_DAY_NUM,
|
||||
};
|
||||
}
|
||||
|
||||
@ -164,11 +185,11 @@ namespace DB
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
|
||||
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
|
||||
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeUInt32>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
@ -176,7 +197,7 @@ namespace DB
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->chunk(chunk_i)));
|
||||
auto & chunk = dynamic_cast<arrow::Date64Array &>(*(arrow_column->chunk(chunk_i)));
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
|
||||
@ -184,10 +205,10 @@ namespace DB
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto internal_type = std::make_shared<DataTypeUInt32>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
@ -195,7 +216,7 @@ namespace DB
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
auto & chunk = dynamic_cast<arrow::TimestampArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
const auto & type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
|
||||
|
||||
UInt32 divide = 1;
|
||||
@ -223,11 +244,11 @@ namespace DB
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
template <typename DecimalType, typename DecimalArray>
|
||||
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
template <typename DecimalType, typename DecimalArray>
|
||||
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
|
||||
auto internal_type = std::make_shared<DataTypeDecimal<DecimalType>>(arrow_decimal_type->precision(), arrow_decimal_type->scale());
|
||||
auto internal_column = internal_type->createColumn();
|
||||
@ -237,18 +258,18 @@ namespace DB
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
auto & chunk = dynamic_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
column_data.emplace_back(chunk.IsNull(value_i) ? DecimalType(0) : *reinterpret_cast<const DecimalType *>(chunk.Value(value_i))); // TODO: copy column
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a null bytemap from arrow's null bitmap
|
||||
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
/// Creates a null bytemap from arrow's null bitmap
|
||||
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
auto nullmap_column = ColumnUInt8::create();
|
||||
PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(*nullmap_column).getData();
|
||||
bytemap_data.reserve(arrow_column->length());
|
||||
@ -261,28 +282,28 @@ namespace DB
|
||||
bytemap_data.emplace_back(chunk->IsNull(value_i));
|
||||
}
|
||||
return nullmap_column;
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
auto offsets_column = ColumnUInt64::create();
|
||||
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
|
||||
offsets_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::ListArray & list_chunk = static_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
auto arrow_offsets_array = list_chunk.offsets();
|
||||
auto & arrow_offsets = static_cast<arrow::Int32Array &>(*arrow_offsets_array);
|
||||
auto & arrow_offsets = dynamic_cast<arrow::Int32Array &>(*arrow_offsets_array);
|
||||
auto start = offsets_data.back();
|
||||
for (int64_t i = 1; i < arrow_offsets.length(); ++i)
|
||||
offsets_data.emplace_back(start + arrow_offsets.Value(i));
|
||||
}
|
||||
return offsets_column;
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
switch (arrow_column->type()->id())
|
||||
{
|
||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||
@ -295,28 +316,28 @@ namespace DB
|
||||
default:
|
||||
throw Exception(fmt::format("Unsupported type for indexes in LowCardinality: {}.", arrow_column->type()->name()), ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
arrow::ArrayVector array_vector;
|
||||
array_vector.reserve(arrow_column->num_chunks());
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::ListArray & list_chunk = static_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
std::shared_ptr<arrow::Array> chunk = list_chunk.values();
|
||||
array_vector.emplace_back(std::move(chunk));
|
||||
}
|
||||
return std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
}
|
||||
}
|
||||
|
||||
static ColumnWithTypeAndName readColumnFromArrowColumn(
|
||||
static ColumnWithTypeAndName readColumnFromArrowColumn(
|
||||
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
|
||||
const std::string & column_name,
|
||||
const std::string & format_name,
|
||||
bool is_nullable,
|
||||
std::unordered_map<String, std::shared_ptr<ColumnWithTypeAndName>> & dictionary_values)
|
||||
{
|
||||
{
|
||||
if (!is_nullable && arrow_column->null_count() && arrow_column->type()->id() != arrow::Type::LIST
|
||||
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT)
|
||||
{
|
||||
@ -374,11 +395,11 @@ namespace DB
|
||||
case arrow::Type::STRUCT:
|
||||
{
|
||||
auto arrow_type = arrow_column->type();
|
||||
auto arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
|
||||
auto * arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
|
||||
std::vector<arrow::ArrayVector> nested_arrow_columns(arrow_struct_type->num_fields());
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::StructArray & struct_chunk = static_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::StructArray & struct_chunk = dynamic_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
for (int i = 0; i < arrow_struct_type->num_fields(); ++i)
|
||||
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
|
||||
}
|
||||
@ -409,7 +430,7 @@ namespace DB
|
||||
arrow::ArrayVector dict_array;
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::DictionaryArray & dict_chunk = static_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
dict_array.emplace_back(dict_chunk.dictionary());
|
||||
}
|
||||
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
|
||||
@ -426,7 +447,7 @@ namespace DB
|
||||
arrow::ArrayVector indexes_array;
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::DictionaryArray & dict_chunk = static_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
indexes_array.emplace_back(dict_chunk.indices());
|
||||
}
|
||||
|
||||
@ -447,48 +468,58 @@ namespace DB
|
||||
default:
|
||||
throw Exception
|
||||
{
|
||||
fmt::format(R"(Unsupported {} type "{}" of an input column "{}".)", format_name, arrow_column->type()->name(), column_name),
|
||||
ErrorCodes::UNKNOWN_TYPE
|
||||
ErrorCodes::UNKNOWN_TYPE,
|
||||
"Unsupported {} type '{}' of an input column '{}'.", format_name, arrow_column->type()->name(), column_name,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Block arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
|
||||
{
|
||||
ColumnsWithTypeAndName sample_columns;
|
||||
for (const auto & field : schema.fields())
|
||||
{
|
||||
/// Create empty arrow column by it's type and convert it to ClickHouse column.
|
||||
arrow::MemoryPool* pool = arrow::default_memory_pool();
|
||||
std::unique_ptr<arrow::ArrayBuilder> array_builder;
|
||||
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
|
||||
checkStatus(status, field->name(), format_name);
|
||||
std::shared_ptr<arrow::Array> arrow_array;
|
||||
status = array_builder->Finish(&arrow_array);
|
||||
checkStatus(status, field->name(), format_name);
|
||||
arrow::ArrayVector array_vector = {arrow_array};
|
||||
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dict_values;
|
||||
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(arrow_column, field->name(), format_name, false, dict_values);
|
||||
sample_columns.emplace_back(std::move(sample_column));
|
||||
}
|
||||
return Block(std::move(sample_columns));
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
// Creating CH header by arrow schema. Will be useful in task about inserting
|
||||
// data from file without knowing table structure.
|
||||
//
|
||||
//static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
|
||||
//{
|
||||
// if (!status.ok())
|
||||
// throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
|
||||
//}
|
||||
//
|
||||
//static Block arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
|
||||
//{
|
||||
// ColumnsWithTypeAndName sample_columns;
|
||||
// for (const auto & field : schema.fields())
|
||||
// {
|
||||
// /// Create empty arrow column by it's type and convert it to ClickHouse column.
|
||||
// arrow::MemoryPool* pool = arrow::default_memory_pool();
|
||||
// std::unique_ptr<arrow::ArrayBuilder> array_builder;
|
||||
// arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
|
||||
// checkStatus(status, field->name(), format_name);
|
||||
// std::shared_ptr<arrow::Array> arrow_array;
|
||||
// status = array_builder->Finish(&arrow_array);
|
||||
// checkStatus(status, field->name(), format_name);
|
||||
// arrow::ArrayVector array_vector = {arrow_array};
|
||||
// auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
// std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dict_values;
|
||||
// ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(arrow_column, field->name(), format_name, false, dict_values);
|
||||
// sample_columns.emplace_back(std::move(sample_column));
|
||||
// }
|
||||
// return Block(std::move(sample_columns));
|
||||
//}
|
||||
//
|
||||
//ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
// const arrow::Schema & schema, const std::string & format_name_, bool import_nested_)
|
||||
// : header(arrowSchemaToCHHeader(schema, format_name_)), format_name(format_name_), import_nested(import_nested_)
|
||||
//{
|
||||
//}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
const Block & header_, const std::string & format_name_, bool import_nested_)
|
||||
: header(header_), format_name(format_name_), import_nested(import_nested_)
|
||||
{
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
const arrow::Schema & schema, const std::string & format_name_, bool import_nested_)
|
||||
: header(arrowSchemaToCHHeader(schema, format_name_)), format_name(format_name_), import_nested(import_nested_)
|
||||
{
|
||||
}
|
||||
|
||||
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
|
||||
{
|
||||
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
|
||||
{
|
||||
Columns columns_list;
|
||||
UInt64 num_rows = 0;
|
||||
|
||||
@ -529,8 +560,7 @@ namespace DB
|
||||
|
||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||
if (!read_from_nested)
|
||||
throw Exception{
|
||||
fmt::format("Column \"{}\" is not presented in input data.", header_column.name), ErrorCodes::THERE_IS_NO_COLUMN};
|
||||
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[header_column.name];
|
||||
@ -548,7 +578,8 @@ namespace DB
|
||||
}
|
||||
|
||||
res.setColumns(columns_list, num_rows);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -22,9 +22,9 @@ class ArrowColumnToCHColumn
|
||||
public:
|
||||
ArrowColumnToCHColumn(const Block & header_, const std::string & format_name_, bool import_nested_);
|
||||
|
||||
/// Create header by arrow schema. It will be useful for inserting
|
||||
/// Constructor that create header by arrow schema. It will be useful for inserting
|
||||
/// data from file without knowing table structure.
|
||||
ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_);
|
||||
/// ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_);
|
||||
|
||||
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
|
||||
|
||||
@ -32,6 +32,7 @@ private:
|
||||
const Block & header;
|
||||
const std::string format_name;
|
||||
bool import_nested;
|
||||
|
||||
/// Map {column name : dictionary column}.
|
||||
/// To avoid converting dictionary from Arrow Dictionary
|
||||
/// to LowCardinality every chunk we save it and reuse.
|
||||
|
@ -46,7 +46,7 @@
|
||||
M(INT64, arrow::Int64Type) \
|
||||
M(FLOAT, arrow::FloatType) \
|
||||
M(DOUBLE, arrow::DoubleType) \
|
||||
M(STRING, arrow::StringType)
|
||||
M(BINARY, arrow::BinaryType)
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -315,15 +315,15 @@ Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not y
|
||||
1593604801 abc 42.125
|
||||
1593604801 def 7.7
|
||||
=== Try load data from nonnullable.impala.parquet
|
||||
8 [-1] [[-1,-2],[]] {'k1':-1} [{},{'k1':1},{},{}] (-1,[-1],([[(-1)]]),{})
|
||||
8 [-1] [[-1,-2],[]] {'k1':-1} [{},{'k1':1},{},{}] (-1,[-1],([[(-1,'nonnullable')]]),{})
|
||||
=== Try load data from nullable.impala.parquet
|
||||
1 [1,2,3] [[1,2],[3,4]] {'k1':1,'k2':100} [{'k1':1}] (1,[1],([[(10),(-10)],[(11)]]),{'foo':(([1.1]))})
|
||||
2 [NULL,1,2,NULL,3,NULL] [[NULL,1,2,NULL],[3,NULL,4],[],[]] {'k1':2,'k2':NULL} [{'k3':NULL,'k1':1},{},{}] (NULL,[NULL],([[(NULL),(10),(NULL),(-10),(NULL)],[(11),(NULL)],[],[]]),{'g1':(([2.2,NULL])),'g2':(([])),'g3':(([])),'g4':(([])),'g5':(([]))})
|
||||
1 [1,2,3] [[1,2],[3,4]] {'k1':1,'k2':100} [{'k1':1}] (1,[1],([[(10,'aaa'),(-10,'bbb')],[(11,'c')]]),{'foo':(([1.1]))})
|
||||
2 [NULL,1,2,NULL,3,NULL] [[NULL,1,2,NULL],[3,NULL,4],[],[]] {'k1':2,'k2':NULL} [{'k3':NULL,'k1':1},{},{}] (NULL,[NULL],([[(NULL,NULL),(10,'aaa'),(NULL,NULL),(-10,'bbb'),(NULL,NULL)],[(11,'c'),(NULL,NULL)],[],[]]),{'g1':(([2.2,NULL])),'g2':(([])),'g3':(([])),'g4':(([])),'g5':(([]))})
|
||||
3 [] [[]] {} [{},{}] (NULL,[],([]),{})
|
||||
4 [] [] {} [] (NULL,[],([]),{})
|
||||
5 [] [] {} [] (NULL,[],([]),{'foo':(([2.2,3.3]))})
|
||||
6 [] [] {} [] (NULL,[],([]),{})
|
||||
7 [] [[],[5,6]] {'k1':NULL,'k3':NULL} [] (7,[2,3,NULL],([[],[(NULL)],[]]),{})
|
||||
7 [] [[],[5,6]] {'k1':NULL,'k3':NULL} [] (7,[2,3,NULL],([[],[(NULL,NULL)],[]]),{})
|
||||
=== Try load data from nullable_list.parquet
|
||||
[1,NULL,2] [NULL,'Some string',NULL] [0.00,NULL,42.42]
|
||||
[NULL] [NULL] [NULL]
|
||||
|
@ -1 +1 @@
|
||||
`ID` Nullable(Int64), `Int_Array` Array(Nullable(Int32)), `int_array_array` Array(Array(Nullable(Int32))), `Int_Map` Map(String, Nullable(Int32)), `int_map_array` Array(Map(String, Nullable(Int32))), `nested_Struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
`ID` Nullable(Int64), `Int_Array` Array(Nullable(Int32)), `int_array_array` Array(Array(Nullable(Int32))), `Int_Map` Map(String, Nullable(Int32)), `int_map_array` Array(Map(String, Nullable(Int32))), `nested_Struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32), Nullable(String))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
|
@ -1 +1 @@
|
||||
`id` Nullable(Int64), `int_array` Array(Nullable(Int32)), `int_array_Array` Array(Array(Nullable(Int32))), `int_map` Map(String, Nullable(Int32)), `int_Map_Array` Array(Map(String, Nullable(Int32))), `nested_struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
`id` Nullable(Int64), `int_array` Array(Nullable(Int32)), `int_array_Array` Array(Array(Nullable(Int32))), `int_map` Map(String, Nullable(Int32)), `int_Map_Array` Array(Map(String, Nullable(Int32))), `nested_struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32), Nullable(String))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
|
Loading…
Reference in New Issue
Block a user