mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #25902 from Avogar/arrow-nested
Refactor ArrowColumnToCHColumn, support inserting Nested as Array(Struct) in Arrow/ORC/Parquet
This commit is contained in:
commit
8c06abee73
@ -194,6 +194,7 @@ public:
|
||||
const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); }
|
||||
IColumnUnique & getDictionary() { return dictionary.getColumnUnique(); }
|
||||
const ColumnPtr & getDictionaryPtr() const { return dictionary.getColumnUniquePtr(); }
|
||||
ColumnPtr & getDictionaryPtr() { return dictionary.getColumnUniquePtr(); }
|
||||
/// IColumnUnique & getUnique() { return static_cast<IColumnUnique &>(*column_unique); }
|
||||
/// ColumnPtr getUniquePtr() const { return column_unique; }
|
||||
|
||||
|
@ -528,6 +528,9 @@ class IColumn;
|
||||
M(Bool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.", 0) \
|
||||
M(Bool, input_format_tsv_enum_as_number, false, "Treat inserted enum values in TSV formats as enum indices \\N", 0) \
|
||||
M(Bool, input_format_null_as_default, true, "For text input formats initialize null fields with default values if data type of this field is not nullable", 0) \
|
||||
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
|
||||
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
|
||||
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
|
||||
\
|
||||
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
|
||||
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \
|
||||
|
@ -208,6 +208,18 @@ void validateArraySizes(const Block & block)
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<String> getAllTableNames(const Block & block)
|
||||
{
|
||||
std::unordered_set<String> nested_table_names;
|
||||
for (auto & name : block.getNames())
|
||||
{
|
||||
auto nested_table_name = Nested::extractTableName(name);
|
||||
if (!nested_table_name.empty())
|
||||
nested_table_names.insert(nested_table_name);
|
||||
}
|
||||
return nested_table_names;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -28,6 +28,9 @@ namespace Nested
|
||||
|
||||
/// Check that sizes of arrays - elements of nested data structures - are equal.
|
||||
void validateArraySizes(const Block & block);
|
||||
|
||||
/// Get all nested tables names from a block.
|
||||
std::unordered_set<String> getAllTableNames(const Block & block);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -88,6 +88,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
|
||||
format_settings.null_as_default = settings.input_format_null_as_default;
|
||||
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
|
||||
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
|
||||
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
|
||||
format_settings.pretty.color = settings.output_format_pretty_color;
|
||||
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
|
||||
@ -114,6 +115,8 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
|
||||
format_settings.write_statistics = settings.output_format_write_statistics;
|
||||
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
|
||||
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
|
||||
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
|
||||
|
||||
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
|
||||
if (format_settings.schema.is_server)
|
||||
|
@ -53,6 +53,7 @@ struct FormatSettings
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
bool low_cardinality_as_dictionary = false;
|
||||
bool import_nested = false;
|
||||
} arrow;
|
||||
|
||||
struct
|
||||
@ -100,6 +101,7 @@ struct FormatSettings
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
bool import_nested = false;
|
||||
} parquet;
|
||||
|
||||
struct Pretty
|
||||
@ -174,6 +176,11 @@ struct FormatSettings
|
||||
bool deduce_templates_of_expressions = true;
|
||||
bool accurate_types_of_literals = true;
|
||||
} values;
|
||||
|
||||
struct
|
||||
{
|
||||
bool import_nested = false;
|
||||
} orc;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -22,8 +22,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
}
|
||||
|
||||
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_)
|
||||
: IInputFormat(header_, in_), stream{stream_}
|
||||
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
|
||||
: IInputFormat(header_, in_), stream{stream_}, format_settings(format_settings_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -102,7 +102,7 @@ void ArrowBlockInputFormat::prepareReader()
|
||||
schema = file_reader->schema();
|
||||
}
|
||||
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), std::move(schema), "Arrow");
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), "Arrow", format_settings.arrow.import_nested);
|
||||
|
||||
if (stream)
|
||||
record_batch_total = -1;
|
||||
@ -119,9 +119,9 @@ void registerInputFormatProcessorArrow(FormatFactory & factory)
|
||||
[](ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const RowInputFormatParams & /* params */,
|
||||
const FormatSettings & /* format_settings */)
|
||||
const FormatSettings & format_settings)
|
||||
{
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false);
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
|
||||
});
|
||||
factory.markFormatAsColumnOriented("Arrow");
|
||||
factory.registerInputFormatProcessor(
|
||||
@ -129,9 +129,9 @@ void registerInputFormatProcessorArrow(FormatFactory & factory)
|
||||
[](ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const RowInputFormatParams & /* params */,
|
||||
const FormatSettings & /* format_settings */)
|
||||
const FormatSettings & format_settings)
|
||||
{
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample, true);
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample, true, format_settings);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
#if USE_ARROW
|
||||
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
|
||||
namespace arrow { class RecordBatchReader; }
|
||||
namespace arrow::ipc { class RecordBatchFileReader; }
|
||||
@ -19,7 +20,7 @@ class ArrowColumnToCHColumn;
|
||||
class ArrowBlockInputFormat : public IInputFormat
|
||||
{
|
||||
public:
|
||||
ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_);
|
||||
ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_);
|
||||
|
||||
void resetParser() override;
|
||||
|
||||
@ -41,6 +42,8 @@ private:
|
||||
int record_batch_total = 0;
|
||||
int record_batch_current = 0;
|
||||
|
||||
const FormatSettings format_settings;
|
||||
|
||||
void prepareReader();
|
||||
};
|
||||
|
||||
|
@ -10,10 +10,12 @@
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeDate32.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <common/DateLUTImpl.h>
|
||||
#include <common/types.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Processors/Chunk.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
@ -22,17 +24,18 @@
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
#include <Columns/ColumnUnique.h>
|
||||
#include <Columns/ColumnMap.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <algorithm>
|
||||
#include <arrow/builder.h>
|
||||
#include <arrow/array.h>
|
||||
|
||||
|
||||
/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
|
||||
#define FOR_ARROW_NUMERIC_TYPES(M) \
|
||||
M(arrow::Type::UINT8, DB::UInt8) \
|
||||
M(arrow::Type::INT8, DB::Int8) \
|
||||
M(arrow::Type::UINT16, DB::UInt16) \
|
||||
M(arrow::Type::INT16, DB::Int16) \
|
||||
M(arrow::Type::UINT32, DB::UInt32) \
|
||||
M(arrow::Type::INT32, DB::Int32) \
|
||||
M(arrow::Type::UINT64, DB::UInt64) \
|
||||
M(arrow::Type::INT64, DB::Int64) \
|
||||
@ -50,7 +53,6 @@
|
||||
M(arrow::Type::UINT64, DB::UInt64) \
|
||||
M(arrow::Type::INT64, DB::UInt64)
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -58,47 +60,19 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_TYPE;
|
||||
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
||||
extern const int CANNOT_CONVERT_TYPE;
|
||||
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
}
|
||||
|
||||
static const std::initializer_list<std::pair<arrow::Type::type, const char *>> arrow_type_to_internal_type =
|
||||
{
|
||||
{arrow::Type::UINT8, "UInt8"},
|
||||
{arrow::Type::INT8, "Int8"},
|
||||
{arrow::Type::UINT16, "UInt16"},
|
||||
{arrow::Type::INT16, "Int16"},
|
||||
{arrow::Type::UINT32, "UInt32"},
|
||||
{arrow::Type::INT32, "Int32"},
|
||||
{arrow::Type::UINT64, "UInt64"},
|
||||
{arrow::Type::INT64, "Int64"},
|
||||
{arrow::Type::HALF_FLOAT, "Float32"},
|
||||
{arrow::Type::FLOAT, "Float32"},
|
||||
{arrow::Type::DOUBLE, "Float64"},
|
||||
|
||||
{arrow::Type::BOOL, "UInt8"},
|
||||
{arrow::Type::DATE32, "Date"},
|
||||
{arrow::Type::DATE32, "Date32"},
|
||||
{arrow::Type::DATE64, "DateTime"},
|
||||
{arrow::Type::TIMESTAMP, "DateTime"},
|
||||
|
||||
{arrow::Type::STRING, "String"},
|
||||
{arrow::Type::BINARY, "String"},
|
||||
|
||||
// TODO: add other types that are convertible to internal ones:
|
||||
// 0. ENUM?
|
||||
// 1. UUID -> String
|
||||
// 2. JSON -> String
|
||||
// Full list of types: contrib/arrow/cpp/src/arrow/type.h
|
||||
};
|
||||
|
||||
/// Inserts numeric data right into internal column data to reduce an overhead
|
||||
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
||||
static void fillColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto & column_data = static_cast<VectorType &>(internal_column).getData();
|
||||
auto internal_type = std::make_shared<DataTypeNumber<NumericType>>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -110,15 +84,18 @@ static void fillColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arr
|
||||
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
|
||||
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
/// Inserts chars and offsets right into internal column data to reduce an overhead.
|
||||
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
|
||||
/// Also internal strings are null terminated.
|
||||
static void fillColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(internal_column).getChars();
|
||||
PaddedPODArray<UInt64> & column_offsets = assert_cast<ColumnString &>(internal_column).getOffsets();
|
||||
auto internal_type = std::make_shared<DataTypeString>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*internal_column).getChars();
|
||||
PaddedPODArray<UInt64> & column_offsets = assert_cast<ColumnString &>(*internal_column).getOffsets();
|
||||
|
||||
size_t chars_t_size = 0;
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -154,11 +131,14 @@ static void fillColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arro
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
static void fillColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto & column_data = assert_cast<ColumnVector<UInt8> &>(internal_column).getData();
|
||||
auto internal_type = std::make_shared<DataTypeUInt8>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt8> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -170,35 +150,14 @@ static void fillColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arr
|
||||
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
|
||||
column_data.emplace_back(chunk.Value(bool_i));
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
|
||||
static void fillColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::Date32Array & chunk = dynamic_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
|
||||
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
|
||||
|
||||
if (days_num > DATE_LUT_MAX_DAY_NUM)
|
||||
throw Exception(ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
|
||||
"Input value {} of a column '{}' is greater than max allowed Date value, which is {}",
|
||||
days_num, internal_column.getName(), DATE_LUT_MAX_DAY_NUM);
|
||||
|
||||
column_data.emplace_back(days_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void fillDate32ColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
{
|
||||
PaddedPODArray<Int32> & column_data = assert_cast<ColumnVector<Int32> &>(internal_column).getData();
|
||||
auto internal_type = std::make_shared<DataTypeDate32>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
PaddedPODArray<Int32> & column_data = assert_cast<ColumnVector<Int32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -209,18 +168,21 @@ static void fillDate32ColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray>
|
||||
{
|
||||
Int32 days_num = static_cast<Int32>(chunk.Value(value_i));
|
||||
if (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM)
|
||||
throw Exception(ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
|
||||
"Input value {} of a column '{}' is greater than max allowed Date value, which is {}", days_num, internal_column.getName(), DATE_LUT_MAX_DAY_NUM);
|
||||
throw Exception{ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
|
||||
"Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, column_name, DATE_LUT_MAX_DAY_NUM};
|
||||
|
||||
column_data.emplace_back(days_num);
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
|
||||
static void fillColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(internal_column).getData();
|
||||
auto internal_type = std::make_shared<DataTypeDateTime>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -232,11 +194,14 @@ static void fillColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arro
|
||||
column_data.emplace_back(timestamp);
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
static void fillColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(internal_column).getData();
|
||||
auto internal_type = std::make_shared<DataTypeUInt32>();
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -268,29 +233,35 @@ static void fillColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & a
|
||||
column_data.emplace_back(timestamp);
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
template <typename DecimalType, typename DecimalArray>
|
||||
static void fillColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
|
||||
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
|
||||
{
|
||||
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(internal_column);
|
||||
const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
|
||||
auto internal_type = std::make_shared<DataTypeDecimal<DecimalType>>(arrow_decimal_type->precision(), arrow_decimal_type->scale());
|
||||
auto internal_column = internal_type->createColumn();
|
||||
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(*internal_column);
|
||||
auto & column_data = column.getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
auto & chunk = dynamic_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
column_data.emplace_back(chunk.IsNull(value_i) ? DecimalType(0) : *reinterpret_cast<const DecimalType *>(chunk.Value(value_i))); // TODO: copy column
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
/// Creates a null bytemap from arrow's null bitmap
|
||||
static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & bytemap)
|
||||
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(bytemap).getData();
|
||||
auto nullmap_column = ColumnUInt8::create();
|
||||
PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(*nullmap_column).getData();
|
||||
bytemap_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->num_chunks()); ++chunk_i)
|
||||
@ -300,11 +271,13 @@ static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & ar
|
||||
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
|
||||
bytemap_data.emplace_back(chunk->IsNull(value_i));
|
||||
}
|
||||
return nullmap_column;
|
||||
}
|
||||
|
||||
static void fillOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & offsets)
|
||||
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(offsets).getData();
|
||||
auto offsets_column = ColumnUInt64::create();
|
||||
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
|
||||
offsets_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -316,17 +289,17 @@ static void fillOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray>
|
||||
for (int64_t i = 1; i < arrow_offsets.length(); ++i)
|
||||
offsets_data.emplace_back(start + arrow_offsets.Value(i));
|
||||
}
|
||||
return offsets_column;
|
||||
}
|
||||
static ColumnPtr createAndFillColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
|
||||
static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
switch (arrow_column->type()->id())
|
||||
{
|
||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||
case ARROW_NUMERIC_TYPE: \
|
||||
{ \
|
||||
auto column = DataTypeNumber<CPP_NUMERIC_TYPE>().createColumn(); \
|
||||
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, *column); \
|
||||
return column; \
|
||||
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, "").column; \
|
||||
}
|
||||
FOR_ARROW_INDEXES_TYPES(DISPATCH)
|
||||
# undef DISPATCH
|
||||
@ -335,75 +308,8 @@ static ColumnPtr createAndFillColumnWithIndexesData(std::shared_ptr<arrow::Chunk
|
||||
}
|
||||
}
|
||||
|
||||
static void readColumnFromArrowColumn(
|
||||
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
|
||||
IColumn & internal_column,
|
||||
const std::string & column_name,
|
||||
const std::string & format_name,
|
||||
bool is_nullable,
|
||||
std::unordered_map<String, ColumnPtr> dictionary_values)
|
||||
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
|
||||
{
|
||||
if (internal_column.isNullable())
|
||||
{
|
||||
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(internal_column);
|
||||
readColumnFromArrowColumn(
|
||||
arrow_column, column_nullable.getNestedColumn(), column_name, format_name, true, dictionary_values);
|
||||
fillByteMapFromArrowColumn(arrow_column, column_nullable.getNullMapColumn());
|
||||
return;
|
||||
}
|
||||
|
||||
/// TODO: check if a column is const?
|
||||
if (!is_nullable && arrow_column->null_count() && arrow_column->type()->id() != arrow::Type::LIST
|
||||
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT)
|
||||
{
|
||||
throw Exception
|
||||
{
|
||||
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN,
|
||||
"Can not insert NULL data into non-nullable column \"{}\".", column_name
|
||||
};
|
||||
}
|
||||
|
||||
switch (arrow_column->type()->id())
|
||||
{
|
||||
case arrow::Type::STRING:
|
||||
case arrow::Type::BINARY:
|
||||
//case arrow::Type::FIXED_SIZE_BINARY:
|
||||
fillColumnWithStringData(arrow_column, internal_column);
|
||||
break;
|
||||
case arrow::Type::BOOL:
|
||||
fillColumnWithBooleanData(arrow_column, internal_column);
|
||||
break;
|
||||
case arrow::Type::DATE32:
|
||||
if (WhichDataType(internal_column.getDataType()).isUInt16())
|
||||
{
|
||||
fillColumnWithDate32Data(arrow_column, internal_column);
|
||||
}
|
||||
else
|
||||
{
|
||||
fillDate32ColumnWithDate32Data(arrow_column, internal_column);
|
||||
}
|
||||
break;
|
||||
case arrow::Type::DATE64:
|
||||
fillColumnWithDate64Data(arrow_column, internal_column);
|
||||
break;
|
||||
case arrow::Type::TIMESTAMP:
|
||||
fillColumnWithTimestampData(arrow_column, internal_column);
|
||||
break;
|
||||
#if defined(ARCADIA_BUILD)
|
||||
case arrow::Type::DECIMAL:
|
||||
fillColumnWithDecimalData<Decimal128, arrow::Decimal128Array>(arrow_column, internal_column /*, internal_nested_type*/);
|
||||
break;
|
||||
#else
|
||||
case arrow::Type::DECIMAL128:
|
||||
fillColumnWithDecimalData<Decimal128, arrow::Decimal128Array>(arrow_column, internal_column /*, internal_nested_type*/);
|
||||
break;
|
||||
case arrow::Type::DECIMAL256:
|
||||
fillColumnWithDecimalData<Decimal256, arrow::Decimal256Array>(arrow_column, internal_column /*, internal_nested_type*/);
|
||||
break;
|
||||
#endif
|
||||
case arrow::Type::MAP: [[fallthrough]];
|
||||
case arrow::Type::LIST:
|
||||
{
|
||||
arrow::ArrayVector array_vector;
|
||||
array_vector.reserve(arrow_column->num_chunks());
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
@ -412,43 +318,117 @@ static void readColumnFromArrowColumn(
|
||||
std::shared_ptr<arrow::Array> chunk = list_chunk.values();
|
||||
array_vector.emplace_back(std::move(chunk));
|
||||
}
|
||||
auto arrow_nested_column = std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
return std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
}
|
||||
|
||||
ColumnArray & column_array = arrow_column->type()->id() == arrow::Type::MAP
|
||||
? assert_cast<ColumnMap &>(internal_column).getNestedColumn()
|
||||
: assert_cast<ColumnArray &>(internal_column);
|
||||
static ColumnWithTypeAndName readColumnFromArrowColumn(
|
||||
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
|
||||
const std::string & column_name,
|
||||
const std::string & format_name,
|
||||
bool is_nullable,
|
||||
std::unordered_map<String, std::shared_ptr<ColumnWithTypeAndName>> & dictionary_values)
|
||||
{
|
||||
if (!is_nullable && arrow_column->null_count() && arrow_column->type()->id() != arrow::Type::LIST
|
||||
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT)
|
||||
{
|
||||
auto nested_column = readColumnFromArrowColumn(arrow_column, column_name, format_name, true, dictionary_values);
|
||||
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
|
||||
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
|
||||
auto nullable_column = ColumnNullable::create(std::move(nested_column.column), std::move(nullmap_column));
|
||||
return {std::move(nullable_column), std::move(nullable_type), column_name};
|
||||
}
|
||||
|
||||
readColumnFromArrowColumn(
|
||||
arrow_nested_column, column_array.getData(), column_name, format_name, false, dictionary_values);
|
||||
switch (arrow_column->type()->id())
|
||||
{
|
||||
case arrow::Type::STRING:
|
||||
case arrow::Type::BINARY:
|
||||
//case arrow::Type::FIXED_SIZE_BINARY:
|
||||
return readColumnWithStringData(arrow_column, column_name);
|
||||
case arrow::Type::BOOL:
|
||||
return readColumnWithBooleanData(arrow_column, column_name);
|
||||
case arrow::Type::DATE32:
|
||||
return readColumnWithDate32Data(arrow_column, column_name);
|
||||
case arrow::Type::DATE64:
|
||||
return readColumnWithDate64Data(arrow_column, column_name);
|
||||
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
|
||||
// so, read UINT16 as Date and UINT32 as DateTime to perform correct conversion
|
||||
// between Date and DateTime further.
|
||||
case arrow::Type::UINT16:
|
||||
{
|
||||
auto column = readColumnWithNumericData<UInt16>(arrow_column, column_name);
|
||||
column.type = std::make_shared<DataTypeDate>();
|
||||
return column;
|
||||
}
|
||||
case arrow::Type::UINT32:
|
||||
{
|
||||
auto column = readColumnWithNumericData<UInt32>(arrow_column, column_name);
|
||||
column.type = std::make_shared<DataTypeDateTime>();
|
||||
return column;
|
||||
}
|
||||
case arrow::Type::TIMESTAMP:
|
||||
return readColumnWithTimestampData(arrow_column, column_name);
|
||||
#if defined(ARCADIA_BUILD)
|
||||
case arrow::Type::DECIMAL:
|
||||
return readColumnWithDecimalData<Decimal128, arrow::Decimal128Array>(arrow_column, column_name);
|
||||
#else
|
||||
case arrow::Type::DECIMAL128:
|
||||
return readColumnWithDecimalData<Decimal128, arrow::Decimal128Array>(arrow_column, column_name);
|
||||
case arrow::Type::DECIMAL256:
|
||||
return readColumnWithDecimalData<Decimal256, arrow::Decimal256Array>(arrow_column, column_name);
|
||||
#endif
|
||||
case arrow::Type::MAP:
|
||||
{
|
||||
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
|
||||
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values);
|
||||
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
|
||||
|
||||
fillOffsetsFromArrowListColumn(arrow_column, column_array.getOffsetsColumn());
|
||||
break;
|
||||
const auto * tuple_column = assert_cast<const ColumnTuple *>(nested_column.column.get());
|
||||
const auto * tuple_type = assert_cast<const DataTypeTuple *>(nested_column.type.get());
|
||||
auto map_column = ColumnMap::create(std::move(tuple_column->getColumnPtr(0)), std::move(tuple_column->getColumnPtr(1)), std::move(offsets_column));
|
||||
auto map_type = std::make_shared<DataTypeMap>(tuple_type->getElements()[0], tuple_type->getElements()[1]);
|
||||
return {std::move(map_column), std::move(map_type), column_name};
|
||||
}
|
||||
case arrow::Type::LIST:
|
||||
{
|
||||
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
|
||||
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values);
|
||||
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
|
||||
auto array_column = ColumnArray::create(std::move(nested_column.column), std::move(offsets_column));
|
||||
auto array_type = std::make_shared<DataTypeArray>(nested_column.type);
|
||||
return {std::move(array_column), std::move(array_type), column_name};
|
||||
}
|
||||
case arrow::Type::STRUCT:
|
||||
{
|
||||
ColumnTuple & column_tuple = assert_cast<ColumnTuple &>(internal_column);
|
||||
int fields_count = column_tuple.tupleSize();
|
||||
std::vector<arrow::ArrayVector> nested_arrow_columns(fields_count);
|
||||
auto arrow_type = arrow_column->type();
|
||||
auto * arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
|
||||
std::vector<arrow::ArrayVector> nested_arrow_columns(arrow_struct_type->num_fields());
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::StructArray & struct_chunk = dynamic_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
|
||||
for (int i = 0; i < fields_count; ++i)
|
||||
for (int i = 0; i < arrow_struct_type->num_fields(); ++i)
|
||||
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
|
||||
}
|
||||
|
||||
for (int i = 0; i != fields_count; ++i)
|
||||
Columns tuple_elements;
|
||||
DataTypes tuple_types;
|
||||
std::vector<String> tuple_names;
|
||||
|
||||
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
|
||||
{
|
||||
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
|
||||
readColumnFromArrowColumn(
|
||||
nested_arrow_column, column_tuple.getColumn(i), column_name, format_name, false, dictionary_values);
|
||||
auto element = readColumnFromArrowColumn(nested_arrow_column, arrow_struct_type->field(i)->name(), format_name, false, dictionary_values);
|
||||
tuple_elements.emplace_back(std::move(element.column));
|
||||
tuple_types.emplace_back(std::move(element.type));
|
||||
tuple_names.emplace_back(std::move(element.name));
|
||||
}
|
||||
break;
|
||||
|
||||
auto tuple_column = ColumnTuple::create(std::move(tuple_elements));
|
||||
auto tuple_type = std::make_shared<DataTypeTuple>(std::move(tuple_types), std::move(tuple_names));
|
||||
return {std::move(tuple_column), std::move(tuple_type), column_name};
|
||||
}
|
||||
case arrow::Type::DICTIONARY:
|
||||
{
|
||||
ColumnLowCardinality & column_lc = assert_cast<ColumnLowCardinality &>(internal_column);
|
||||
auto & dict_values = dictionary_values[column_name];
|
||||
|
||||
/// Load dictionary values only once and reuse it.
|
||||
if (!dict_values)
|
||||
{
|
||||
@ -459,14 +439,14 @@ static void readColumnFromArrowColumn(
|
||||
dict_array.emplace_back(dict_chunk.dictionary());
|
||||
}
|
||||
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
|
||||
auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_values);
|
||||
|
||||
auto dict_column = IColumn::mutate(column_lc.getDictionaryPtr());
|
||||
auto * uniq_column = static_cast<IColumnUnique *>(dict_column.get());
|
||||
auto values_column = uniq_column->getNestedColumn()->cloneEmpty();
|
||||
readColumnFromArrowColumn(
|
||||
arrow_dict_column, *values_column, column_name, format_name, false, dictionary_values);
|
||||
uniq_column->uniqueInsertRangeFrom(*values_column, 0, values_column->size());
|
||||
dict_values = std::move(dict_column);
|
||||
/// We should convert read column to ColumnUnique.
|
||||
auto tmp_lc_column = DataTypeLowCardinality(dict_column.type).createColumn();
|
||||
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
|
||||
static_cast<IColumnUnique *>(tmp_dict_column.get())->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
|
||||
dict_column.column = std::move(tmp_dict_column);
|
||||
dict_values = std::make_shared<ColumnWithTypeAndName>(std::move(dict_column));
|
||||
}
|
||||
|
||||
arrow::ArrayVector indexes_array;
|
||||
@ -477,17 +457,14 @@ static void readColumnFromArrowColumn(
|
||||
}
|
||||
|
||||
auto arrow_indexes_column = std::make_shared<arrow::ChunkedArray>(indexes_array);
|
||||
auto indexes_column = createAndFillColumnWithIndexesData(arrow_indexes_column);
|
||||
|
||||
auto new_column_lc = ColumnLowCardinality::create(dict_values, std::move(indexes_column));
|
||||
column_lc = std::move(*new_column_lc);
|
||||
break;
|
||||
auto indexes_column = readColumnWithIndexesData(arrow_indexes_column);
|
||||
auto lc_column = ColumnLowCardinality::create(dict_values->column, std::move(indexes_column));
|
||||
auto lc_type = std::make_shared<DataTypeLowCardinality>(dict_values->type);
|
||||
return {std::move(lc_column), std::move(lc_type), column_name};
|
||||
}
|
||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||
case ARROW_NUMERIC_TYPE: \
|
||||
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, internal_column); \
|
||||
break;
|
||||
|
||||
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, column_name);
|
||||
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
|
||||
# undef DISPATCH
|
||||
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
|
||||
@ -499,140 +476,48 @@ static void readColumnFromArrowColumn(
|
||||
}
|
||||
}
|
||||
|
||||
static DataTypePtr getInternalType(
|
||||
std::shared_ptr<arrow::DataType> arrow_type,
|
||||
const DataTypePtr & column_type,
|
||||
const std::string & column_name,
|
||||
const std::string & format_name)
|
||||
|
||||
// Creating CH header by arrow schema. Will be useful in task about inserting
|
||||
// data from file without knowing table structure.
|
||||
|
||||
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
|
||||
{
|
||||
if (column_type->isNullable())
|
||||
{
|
||||
DataTypePtr nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
|
||||
return makeNullable(getInternalType(arrow_type, nested_type, column_name, format_name));
|
||||
}
|
||||
|
||||
#if defined(ARCADIA_BUILD)
|
||||
if (arrow_type->id() == arrow::Type::DECIMAL)
|
||||
{
|
||||
const auto & decimal_type = dynamic_cast<const arrow::DecimalType &>(*arrow_type);
|
||||
return std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type.precision(), decimal_type.scale());
|
||||
}
|
||||
#else
|
||||
if (arrow_type->id() == arrow::Type::DECIMAL128)
|
||||
{
|
||||
const auto & decimal_type = dynamic_cast<const arrow::DecimalType &>(*arrow_type);
|
||||
return std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type.precision(), decimal_type.scale());
|
||||
}
|
||||
|
||||
if (arrow_type->id() == arrow::Type::DECIMAL256)
|
||||
{
|
||||
const auto & decimal_type = dynamic_cast<const arrow::DecimalType &>(*arrow_type);
|
||||
return std::make_shared<DataTypeDecimal<Decimal256>>(decimal_type.precision(), decimal_type.scale());
|
||||
}
|
||||
#endif
|
||||
|
||||
if (arrow_type->id() == arrow::Type::LIST)
|
||||
{
|
||||
const auto & list_type = dynamic_cast<const arrow::ListType &>(*arrow_type);
|
||||
auto list_nested_type = list_type.value_type();
|
||||
|
||||
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(column_type.get());
|
||||
if (!array_type)
|
||||
throw Exception{ErrorCodes::CANNOT_CONVERT_TYPE,
|
||||
"Cannot convert arrow LIST type to a not Array ClickHouse type {}.", column_type->getName()};
|
||||
|
||||
return std::make_shared<DataTypeArray>(getInternalType(list_nested_type, array_type->getNestedType(), column_name, format_name));
|
||||
}
|
||||
|
||||
if (arrow_type->id() == arrow::Type::STRUCT)
|
||||
{
|
||||
const auto & struct_type = dynamic_cast<const arrow::StructType &>(*arrow_type);
|
||||
const DataTypeTuple * tuple_type = typeid_cast<const DataTypeTuple *>(column_type.get());
|
||||
if (!tuple_type)
|
||||
throw Exception{ErrorCodes::CANNOT_CONVERT_TYPE,
|
||||
"Cannot convert arrow STRUCT type to a not Tuple ClickHouse type {}.", column_type->getName()};
|
||||
|
||||
const DataTypes & tuple_nested_types = tuple_type->getElements();
|
||||
int internal_fields_num = tuple_nested_types.size();
|
||||
/// If internal column has less elements then arrow struct, we will select only first internal_fields_num columns.
|
||||
if (internal_fields_num > struct_type.num_fields())
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE,
|
||||
"Cannot convert arrow STRUCT with {} fields to a ClickHouse Tuple with {} elements: {}.",
|
||||
struct_type.num_fields(),
|
||||
internal_fields_num,
|
||||
column_type->getName());
|
||||
|
||||
DataTypes nested_types;
|
||||
for (int i = 0; i < internal_fields_num; ++i)
|
||||
nested_types.push_back(getInternalType(struct_type.field(i)->type(), tuple_nested_types[i], column_name, format_name));
|
||||
|
||||
return std::make_shared<DataTypeTuple>(std::move(nested_types));
|
||||
}
|
||||
|
||||
if (arrow_type->id() == arrow::Type::DICTIONARY)
|
||||
{
|
||||
const auto & arrow_dict_type = dynamic_cast<const arrow::DictionaryType &>(*arrow_type);
|
||||
const auto * lc_type = typeid_cast<const DataTypeLowCardinality *>(column_type.get());
|
||||
/// We allow to insert arrow dictionary into a non-LowCardinality column.
|
||||
const auto & dict_type = lc_type ? lc_type->getDictionaryType() : column_type;
|
||||
return std::make_shared<DataTypeLowCardinality>(getInternalType(arrow_dict_type.value_type(), dict_type, column_name, format_name));
|
||||
}
|
||||
|
||||
if (arrow_type->id() == arrow::Type::MAP)
|
||||
{
|
||||
const auto & arrow_map_type = typeid_cast<const arrow::MapType &>(*arrow_type);
|
||||
const auto * map_type = typeid_cast<const DataTypeMap *>(column_type.get());
|
||||
if (!map_type)
|
||||
throw Exception{ErrorCodes::CANNOT_CONVERT_TYPE, "Cannot convert arrow MAP type to a not Map ClickHouse type {}.", column_type->getName()};
|
||||
|
||||
return std::make_shared<DataTypeMap>(
|
||||
getInternalType(arrow_map_type.key_type(), map_type->getKeyType(), column_name, format_name),
|
||||
getInternalType(arrow_map_type.item_type(), map_type->getValueType(), column_name, format_name));
|
||||
}
|
||||
|
||||
if (arrow_type->id() == arrow::Type::UINT16
|
||||
&& (isDate(column_type) || isDateTime(column_type) || isDate32(column_type) || isDateTime64(column_type)))
|
||||
{
|
||||
/// Read UInt16 as Date. It will allow correct conversion to DateTime further.
|
||||
return std::make_shared<DataTypeDate>();
|
||||
}
|
||||
|
||||
auto filter = [=](auto && elem)
|
||||
{
|
||||
auto which = WhichDataType(column_type);
|
||||
if (arrow_type->id() == arrow::Type::DATE32 && which.isDateOrDate32())
|
||||
{
|
||||
return (strcmp(elem.second, "Date") == 0 && which.isDate())
|
||||
|| (strcmp(elem.second, "Date32") == 0 && which.isDate32());
|
||||
}
|
||||
else
|
||||
{
|
||||
return elem.first == arrow_type->id();
|
||||
}
|
||||
};
|
||||
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(), filter);
|
||||
internal_type_it != arrow_type_to_internal_type.end())
|
||||
{
|
||||
return DataTypeFactory::instance().get(internal_type_it->second);
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::CANNOT_CONVERT_TYPE,
|
||||
"The type '{}' of an input column '{}' is not supported for conversion from {} data format.",
|
||||
arrow_type->name(), column_name, format_name);
|
||||
if (!status.ok())
|
||||
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(const Block & header_, std::shared_ptr<arrow::Schema> schema_, const std::string & format_name_)
|
||||
: header(header_), format_name(format_name_)
|
||||
static Block arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
|
||||
{
|
||||
for (const auto & field : schema_->fields())
|
||||
ColumnsWithTypeAndName sample_columns;
|
||||
for (const auto & field : schema.fields())
|
||||
{
|
||||
if (header.has(field->name()))
|
||||
{
|
||||
const auto column_type = recursiveRemoveLowCardinality(header.getByName(field->name()).type);
|
||||
name_to_internal_type[field->name()] = getInternalType(field->type(), column_type, field->name(), format_name);
|
||||
}
|
||||
/// Create empty arrow column by it's type and convert it to ClickHouse column.
|
||||
arrow::MemoryPool* pool = arrow::default_memory_pool();
|
||||
std::unique_ptr<arrow::ArrayBuilder> array_builder;
|
||||
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
|
||||
checkStatus(status, field->name(), format_name);
|
||||
std::shared_ptr<arrow::Array> arrow_array;
|
||||
status = array_builder->Finish(&arrow_array);
|
||||
checkStatus(status, field->name(), format_name);
|
||||
arrow::ArrayVector array_vector = {arrow_array};
|
||||
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
|
||||
std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dict_values;
|
||||
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(arrow_column, field->name(), format_name, false, dict_values);
|
||||
sample_columns.emplace_back(std::move(sample_column));
|
||||
}
|
||||
return Block(std::move(sample_columns));
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
const arrow::Schema & schema, const std::string & format_name_, bool import_nested_)
|
||||
: header(arrowSchemaToCHHeader(schema, format_name_)), format_name(format_name_), import_nested(import_nested_)
|
||||
{
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
const Block & header_, const std::string & format_name_, bool import_nested_)
|
||||
: header(header_), format_name(format_name_), import_nested(import_nested_)
|
||||
{
|
||||
}
|
||||
|
||||
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
|
||||
@ -645,31 +530,48 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
|
||||
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
|
||||
|
||||
NameToColumnPtr name_to_column_ptr;
|
||||
for (const auto & column_name : table->ColumnNames())
|
||||
for (const auto& column_name : table->ColumnNames())
|
||||
{
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
|
||||
name_to_column_ptr[column_name] = arrow_column;
|
||||
}
|
||||
|
||||
std::unordered_map<String, BlockPtr> nested_tables;
|
||||
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
|
||||
{
|
||||
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
|
||||
|
||||
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
|
||||
bool read_from_nested = false;
|
||||
String nested_table_name = Nested::extractTableName(header_column.name);
|
||||
if (!name_to_column_ptr.contains(header_column.name))
|
||||
{
|
||||
/// Check if it's a column from nested table.
|
||||
if (import_nested && name_to_column_ptr.contains(nested_table_name))
|
||||
{
|
||||
if (!nested_tables.contains(nested_table_name))
|
||||
{
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[nested_table_name];
|
||||
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(arrow_column, nested_table_name, format_name, false, dictionary_values)};
|
||||
Block block(cols);
|
||||
nested_tables[nested_table_name] = std::make_shared<Block>(Nested::flatten(block));
|
||||
}
|
||||
|
||||
read_from_nested = nested_tables[nested_table_name]->has(header_column.name);
|
||||
}
|
||||
|
||||
|
||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
|
||||
"Column '{}' is not presented in input data.", header_column.name);
|
||||
if (!read_from_nested)
|
||||
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[header_column.name];
|
||||
|
||||
DataTypePtr & internal_type = name_to_internal_type[header_column.name];
|
||||
MutableColumnPtr read_column = internal_type->createColumn();
|
||||
readColumnFromArrowColumn(arrow_column, *read_column, header_column.name, format_name, false, dictionary_values);
|
||||
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = header_column.name;
|
||||
column.type = internal_type;
|
||||
column.column = std::move(read_column);
|
||||
if (read_from_nested)
|
||||
column = nested_tables[nested_table_name]->getByName(header_column.name);
|
||||
else
|
||||
column = readColumnFromArrowColumn(arrow_column, header_column.name, format_name, false, dictionary_values);
|
||||
|
||||
column.column = castColumn(column, header_column.type);
|
||||
column.type = header_column.type;
|
||||
@ -681,5 +583,5 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -7,6 +7,8 @@
|
||||
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <Core/Block.h>
|
||||
#include <arrow/table.h>
|
||||
|
||||
|
||||
@ -19,19 +21,23 @@ class Chunk;
|
||||
class ArrowColumnToCHColumn
|
||||
{
|
||||
public:
|
||||
ArrowColumnToCHColumn(const Block & header_, std::shared_ptr<arrow::Schema> schema_, const std::string & format_name_);
|
||||
ArrowColumnToCHColumn(const Block & header_, const std::string & format_name_, bool import_nested_);
|
||||
|
||||
/// Constructor that create header by arrow schema. It will be useful for inserting
|
||||
/// data from file without knowing table structure.
|
||||
ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_);
|
||||
|
||||
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
|
||||
|
||||
private:
|
||||
const Block & header;
|
||||
std::unordered_map<std::string, DataTypePtr> name_to_internal_type;
|
||||
const Block header;
|
||||
const std::string format_name;
|
||||
bool import_nested;
|
||||
|
||||
/// Map {column name : dictionary column}.
|
||||
/// To avoid converting dictionary from Arrow Dictionary
|
||||
/// to LowCardinality every chunk we save it and reuse.
|
||||
std::unordered_map<std::string, ColumnPtr> dictionary_values;
|
||||
std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dictionary_values;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -46,7 +46,7 @@
|
||||
M(INT64, arrow::Int64Type) \
|
||||
M(FLOAT, arrow::FloatType) \
|
||||
M(DOUBLE, arrow::DoubleType) \
|
||||
M(STRING, arrow::StringType)
|
||||
M(BINARY, arrow::BinaryType)
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -72,6 +72,7 @@ namespace DB
|
||||
|
||||
{"Date", arrow::uint16()}, /// uint16 is used instead of date32, because Apache Arrow cannot correctly serialize Date32Array.
|
||||
{"DateTime", arrow::uint32()}, /// uint32 is used instead of date64, because we don't need milliseconds.
|
||||
{"Date32", arrow::date32()},
|
||||
|
||||
{"String", arrow::binary()},
|
||||
{"FixedString", arrow::binary()},
|
||||
@ -295,6 +296,7 @@ namespace DB
|
||||
FOR_ARROW_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot fill arrow array with {} data.", column_type->getName());
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
@ -334,7 +336,6 @@ namespace DB
|
||||
size_t end)
|
||||
{
|
||||
const PaddedPODArray<UInt16> & internal_data = assert_cast<const ColumnVector<UInt16> &>(*write_column).getData();
|
||||
//arrow::Date32Builder date_builder;
|
||||
arrow::UInt16Builder & builder = assert_cast<arrow::UInt16Builder &>(*array_builder);
|
||||
arrow::Status status;
|
||||
|
||||
@ -343,7 +344,6 @@ namespace DB
|
||||
if (null_bytemap && (*null_bytemap)[value_i])
|
||||
status = builder.AppendNull();
|
||||
else
|
||||
/// Implicitly converts UInt16 to Int32
|
||||
status = builder.Append(internal_data[value_i]);
|
||||
checkStatus(status, write_column->getName(), format_name);
|
||||
}
|
||||
@ -372,6 +372,28 @@ namespace DB
|
||||
}
|
||||
}
|
||||
|
||||
static void fillArrowArrayWithDate32ColumnData(
|
||||
ColumnPtr write_column,
|
||||
const PaddedPODArray<UInt8> * null_bytemap,
|
||||
const String & format_name,
|
||||
arrow::ArrayBuilder* array_builder,
|
||||
size_t start,
|
||||
size_t end)
|
||||
{
|
||||
const PaddedPODArray<Int32> & internal_data = assert_cast<const ColumnVector<Int32> &>(*write_column).getData();
|
||||
arrow::Date32Builder & builder = assert_cast<arrow::Date32Builder &>(*array_builder);
|
||||
arrow::Status status;
|
||||
|
||||
for (size_t value_i = start; value_i < end; ++value_i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[value_i])
|
||||
status = builder.AppendNull();
|
||||
else
|
||||
status = builder.Append(internal_data[value_i]);
|
||||
checkStatus(status, write_column->getName(), format_name);
|
||||
}
|
||||
}
|
||||
|
||||
static void fillArrowArray(
|
||||
const String & column_name,
|
||||
ColumnPtr & column,
|
||||
@ -410,6 +432,10 @@ namespace DB
|
||||
{
|
||||
fillArrowArrayWithDateTimeColumnData(column, null_bytemap, format_name, array_builder, start, end);
|
||||
}
|
||||
else if (isDate32(column_type))
|
||||
{
|
||||
fillArrowArrayWithDate32ColumnData(column, null_bytemap, format_name, array_builder, start, end);
|
||||
}
|
||||
else if (isArray(column_type))
|
||||
{
|
||||
fillArrowArrayWithArrayColumnData<arrow::ListBuilder>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <arrow/io/memory.h>
|
||||
#include "ArrowBufferedStreams.h"
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -26,7 +27,8 @@ namespace ErrorCodes
|
||||
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
|
||||
} while (false)
|
||||
|
||||
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_)
|
||||
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
|
||||
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -98,7 +100,11 @@ void ORCBlockInputFormat::prepareReader()
|
||||
std::shared_ptr<arrow::Schema> schema;
|
||||
THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema));
|
||||
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), schema, "ORC");
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), "ORC", format_settings.orc.import_nested);
|
||||
|
||||
std::unordered_set<String> nested_table_names;
|
||||
if (format_settings.orc.import_nested)
|
||||
nested_table_names = Nested::getAllTableNames(getPort().getHeader());
|
||||
|
||||
/// In ReadStripe column indices should be started from 1,
|
||||
/// because 0 indicates to select all columns.
|
||||
@ -108,7 +114,8 @@ void ORCBlockInputFormat::prepareReader()
|
||||
/// LIST type require 2 indices, STRUCT - the number of elements + 1,
|
||||
/// so we should recursively count the number of indices we need for this type.
|
||||
int indexes_count = countIndicesForType(schema->field(i)->type());
|
||||
if (getPort().getHeader().has(schema->field(i)->name()))
|
||||
const auto & name = schema->field(i)->name();
|
||||
if (getPort().getHeader().has(name) || nested_table_names.contains(name))
|
||||
{
|
||||
for (int j = 0; j != indexes_count; ++j)
|
||||
include_indices.push_back(index + j);
|
||||
@ -124,9 +131,9 @@ void registerInputFormatProcessorORC(FormatFactory &factory)
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams &,
|
||||
const FormatSettings & /* settings */)
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<ORCBlockInputFormat>(buf, sample);
|
||||
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
|
||||
});
|
||||
factory.markFormatAsColumnOriented("ORC");
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#if USE_ORC
|
||||
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
|
||||
namespace arrow::adapters::orc { class ORCFileReader; }
|
||||
|
||||
@ -14,7 +15,7 @@ class ArrowColumnToCHColumn;
|
||||
class ORCBlockInputFormat : public IInputFormat
|
||||
{
|
||||
public:
|
||||
ORCBlockInputFormat(ReadBuffer & in_, Block header_);
|
||||
ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
|
||||
|
||||
String getName() const override { return "ORCBlockInputFormat"; }
|
||||
|
||||
@ -38,6 +39,8 @@ private:
|
||||
// indices of columns to read from ORC file
|
||||
std::vector<int> include_indices;
|
||||
|
||||
const FormatSettings format_settings;
|
||||
|
||||
void prepareReader();
|
||||
};
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <parquet/file_reader.h>
|
||||
#include "ArrowBufferedStreams.h"
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
@ -30,8 +31,8 @@ namespace ErrorCodes
|
||||
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
|
||||
} while (false)
|
||||
|
||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
|
||||
: IInputFormat(std::move(header_), in_)
|
||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
|
||||
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -98,7 +99,11 @@ void ParquetBlockInputFormat::prepareReader()
|
||||
std::shared_ptr<arrow::Schema> schema;
|
||||
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
||||
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), schema, "Parquet");
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), "Parquet", format_settings.parquet.import_nested);
|
||||
|
||||
std::unordered_set<String> nested_table_names;
|
||||
if (format_settings.parquet.import_nested)
|
||||
nested_table_names = Nested::getAllTableNames(getPort().getHeader());
|
||||
|
||||
int index = 0;
|
||||
for (int i = 0; i < schema->num_fields(); ++i)
|
||||
@ -107,7 +112,8 @@ void ParquetBlockInputFormat::prepareReader()
|
||||
/// nested elements, so we should recursively
|
||||
/// count the number of indices we need for this type.
|
||||
int indexes_count = countIndicesForType(schema->field(i)->type());
|
||||
if (getPort().getHeader().has(schema->field(i)->name()))
|
||||
const auto & name = schema->field(i)->name();
|
||||
if (getPort().getHeader().has(name) || nested_table_names.contains(name))
|
||||
{
|
||||
for (int j = 0; j != indexes_count; ++j)
|
||||
column_indices.push_back(index + j);
|
||||
@ -123,9 +129,9 @@ void registerInputFormatProcessorParquet(FormatFactory &factory)
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams &,
|
||||
const FormatSettings & /* settings */)
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
|
||||
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
|
||||
});
|
||||
factory.markFormatAsColumnOriented("Parquet");
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
|
||||
namespace parquet::arrow { class FileReader; }
|
||||
|
||||
@ -17,7 +18,7 @@ class ArrowColumnToCHColumn;
|
||||
class ParquetBlockInputFormat : public IInputFormat
|
||||
{
|
||||
public:
|
||||
ParquetBlockInputFormat(ReadBuffer & in_, Block header_);
|
||||
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
|
||||
|
||||
void resetParser() override;
|
||||
|
||||
@ -36,6 +37,7 @@ private:
|
||||
std::vector<int> column_indices;
|
||||
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
|
||||
int row_group_current = 0;
|
||||
const FormatSettings format_settings;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -315,15 +315,15 @@ Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not y
|
||||
1593604801 abc 42.125
|
||||
1593604801 def 7.7
|
||||
=== Try load data from nonnullable.impala.parquet
|
||||
8 [-1] [[-1,-2],[]] {'k1':-1} [{},{'k1':1},{},{}] (-1,[-1],([[(-1)]]),{})
|
||||
8 [-1] [[-1,-2],[]] {'k1':-1} [{},{'k1':1},{},{}] (-1,[-1],([[(-1,'nonnullable')]]),{})
|
||||
=== Try load data from nullable.impala.parquet
|
||||
1 [1,2,3] [[1,2],[3,4]] {'k1':1,'k2':100} [{'k1':1}] (1,[1],([[(10),(-10)],[(11)]]),{'foo':(([1.1]))})
|
||||
2 [NULL,1,2,NULL,3,NULL] [[NULL,1,2,NULL],[3,NULL,4],[],[]] {'k1':2,'k2':NULL} [{'k3':NULL,'k1':1},{},{}] (NULL,[NULL],([[(NULL),(10),(NULL),(-10),(NULL)],[(11),(NULL)],[],[]]),{'g1':(([2.2,NULL])),'g2':(([])),'g3':(([])),'g4':(([])),'g5':(([]))})
|
||||
1 [1,2,3] [[1,2],[3,4]] {'k1':1,'k2':100} [{'k1':1}] (1,[1],([[(10,'aaa'),(-10,'bbb')],[(11,'c')]]),{'foo':(([1.1]))})
|
||||
2 [NULL,1,2,NULL,3,NULL] [[NULL,1,2,NULL],[3,NULL,4],[],[]] {'k1':2,'k2':NULL} [{'k3':NULL,'k1':1},{},{}] (NULL,[NULL],([[(NULL,NULL),(10,'aaa'),(NULL,NULL),(-10,'bbb'),(NULL,NULL)],[(11,'c'),(NULL,NULL)],[],[]]),{'g1':(([2.2,NULL])),'g2':(([])),'g3':(([])),'g4':(([])),'g5':(([]))})
|
||||
3 [] [[]] {} [{},{}] (NULL,[],([]),{})
|
||||
4 [] [] {} [] (NULL,[],([]),{})
|
||||
5 [] [] {} [] (NULL,[],([]),{'foo':(([2.2,3.3]))})
|
||||
6 [] [] {} [] (NULL,[],([]),{})
|
||||
7 [] [[],[5,6]] {'k1':NULL,'k3':NULL} [] (7,[2,3,NULL],([[],[(NULL)],[]]),{})
|
||||
7 [] [[],[5,6]] {'k1':NULL,'k3':NULL} [] (7,[2,3,NULL],([[],[(NULL,NULL)],[]]),{})
|
||||
=== Try load data from nullable_list.parquet
|
||||
[1,NULL,2] [NULL,'Some string',NULL] [0.00,NULL,42.42]
|
||||
[NULL] [NULL] [NULL]
|
||||
|
@ -0,0 +1,21 @@
|
||||
Arrow
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414]
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
Parquet
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414]
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
ORC
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414]
|
||||
[1,2,3] ['123','456','789'] [9.8,10.12,11.14] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[4,5,6] ['101112','131415','161718'] [123.8,10.2,11.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
||||
[7,8,9] ['101','415','118'] [13.08,1.12,0.414] [[(1,'123',9.8),(2,'456',10.12),(3,'789',11.14)],[(4,'101112',123.8),(5,'131415',10.2),(6,'161718',11.414)],[(7,'101',13.08),(8,'415',1.12),(9,'118',0.414)]]
|
36
tests/queries/0_stateless/00900_orc_arrow_parquet_nested.sh
Executable file
36
tests/queries/0_stateless/00900_orc_arrow_parquet_nested.sh
Executable file
@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS nested_table"
|
||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS nested_nested_table"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="CREATE TABLE nested_table (table Nested(elem1 Int32, elem2 String, elem3 Float32)) engine=Memory"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="CREATE TABLE nested_nested_table (table Nested(elem1 Int32, elem2 String, elem3 Float32, nested Nested(elem1 Int32, elem2 String, elem3 Float32))) engine=Memory"
|
||||
|
||||
|
||||
formats=('Arrow' 'Parquet' 'ORC')
|
||||
format_files=('arrow' 'parquet' 'orc')
|
||||
|
||||
for ((i = 0; i < 3; i++)) do
|
||||
echo ${formats[i]}
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE nested_table"
|
||||
cat $CUR_DIR/data_orc_arrow_parquet_nested/nested_table.${format_files[i]} | ${CLICKHOUSE_CLIENT} -q "INSERT INTO nested_table FORMAT ${formats[i]} SETTINGS input_format_${format_files[i]}_import_nested = 1"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SELECT * FROM nested_table"
|
||||
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE nested_nested_table"
|
||||
cat $CUR_DIR/data_orc_arrow_parquet_nested/nested_nested_table.${format_files[i]} | ${CLICKHOUSE_CLIENT} -q "INSERT INTO nested_nested_table FORMAT ${formats[i]} SETTINGS input_format_${format_files[i]}_import_nested = 1"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SELECT * FROM nested_nested_table"
|
||||
|
||||
|
||||
done
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE nested_table"
|
||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE nested_nested_table"
|
@ -1,2 +1,5 @@
|
||||
1 ['a','b','c'] ('z','6')
|
||||
2 ['d','e'] ('x','9')
|
||||
1 ['a','b','c'] ('z','6')
|
||||
2 ['d','e'] ('x','9')
|
||||
20000000
|
||||
|
@ -20,5 +20,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_dicts FORMAT Arrow SETTINGS ou
|
||||
|
||||
cat "${CLICKHOUSE_TMP}"/dicts.arrow | ${CLICKHOUSE_CLIENT} -q "INSERT INTO arrow_dicts FORMAT Arrow"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM arrow_dicts"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_dicts"
|
||||
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1 +1 @@
|
||||
`ID` Nullable(Int64), `Int_Array` Array(Nullable(Int32)), `int_array_array` Array(Array(Nullable(Int32))), `Int_Map` Map(String, Nullable(Int32)), `int_map_array` Array(Map(String, Nullable(Int32))), `nested_Struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
`ID` Nullable(Int64), `Int_Array` Array(Nullable(Int32)), `int_array_array` Array(Array(Nullable(Int32))), `Int_Map` Map(String, Nullable(Int32)), `int_map_array` Array(Map(String, Nullable(Int32))), `nested_Struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32), Nullable(String))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
|
@ -1 +1 @@
|
||||
`id` Nullable(Int64), `int_array` Array(Nullable(Int32)), `int_array_Array` Array(Array(Nullable(Int32))), `int_map` Map(String, Nullable(Int32)), `int_Map_Array` Array(Map(String, Nullable(Int32))), `nested_struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
`id` Nullable(Int64), `int_array` Array(Nullable(Int32)), `int_array_Array` Array(Array(Nullable(Int32))), `int_map` Map(String, Nullable(Int32)), `int_Map_Array` Array(Map(String, Nullable(Int32))), `nested_struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32), Nullable(String))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))
|
||||
|
BIN
tests/queries/0_stateless/dicts.arrow
Normal file
BIN
tests/queries/0_stateless/dicts.arrow
Normal file
Binary file not shown.
0
tests/queries/0_stateless/maps
Normal file
0
tests/queries/0_stateless/maps
Normal file
Loading…
Reference in New Issue
Block a user