fixed due to review

This commit is contained in:
yariks5s 2023-10-27 15:43:03 +00:00
parent a1b9ab5877
commit 23635352f1
4 changed files with 374 additions and 184 deletions

View File

@ -2448,7 +2448,22 @@ Result:
## Npy {#data-format-npy}
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. It stores all the top-level dimension objects as a separate column.
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy type | ClickHouse type |
|:--------:|:---------------:|
| b1 | Int8 |
| i1 | Int8 |
| i2 | Int16 |
| i4 | Int32 |
| i8 | Int64 |
| u1 | UInt8 |
| u2 | UInt16 |
| u4 | UInt32 |
| u8 | UInt64 |
| f4 | Float32 |
| f8 | Float64 |
| S | String |
| U | String |
**Example**

View File

@ -0,0 +1,108 @@
#include <cstddef>
#include <Storages/NamedCollectionsHelpers.h>
enum class NumpyDataTypeIndex
{
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Float32,
Float64,
String,
Unicode,
};
class NumpyDataType
{
public:
enum Endianness
{
LITTLE,
BIG,
NONE,
};
explicit NumpyDataType(Endianness endianess_) : endianess(endianess_) {}
virtual ~NumpyDataType() = default;
Endianness getEndianness() const { return endianess; }
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
virtual size_t getSize() const = 0;
private:
Endianness endianess;
};
class NumpyDataTypeInt : public NumpyDataType
{
public:
NumpyDataTypeInt(Endianness endianess, size_t size_, bool is_signed_) : NumpyDataType(endianess), size(size_), is_signed(is_signed_) {}
NumpyDataTypeIndex getTypeIndex() const override
{
switch (size)
{
case 1: return is_signed ? NumpyDataTypeIndex::Int8 : NumpyDataTypeIndex::UInt8;
case 2: return is_signed ? NumpyDataTypeIndex::Int16 : NumpyDataTypeIndex::UInt16;
case 4: return is_signed ? NumpyDataTypeIndex::Int32 : NumpyDataTypeIndex::UInt32;
case 8: return is_signed ? NumpyDataTypeIndex::Int64 : NumpyDataTypeIndex::UInt64;
default:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Incorrect int type with size {}", size);
}
}
size_t getSize() const override { return size; }
bool isSigned() const { return is_signed; }
private:
size_t size;
bool is_signed;
};
class NumpyDataTypeFloat : public NumpyDataType
{
public:
NumpyDataTypeFloat(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeIndex getTypeIndex() const override
{
switch (size)
{
case 4: return NumpyDataTypeIndex::Float32;
case 8: return NumpyDataTypeIndex::Float64;
default:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Incorrect float type with size {}", size);
}
}
size_t getSize() const override { return size; }
private:
size_t size;
};
class NumpyDataTypeString : public NumpyDataType
{
public:
NumpyDataTypeString(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeIndex getTypeIndex() const override { return NumpyDataTypeIndex::String; }
size_t getSize() const override { return size; }
private:
size_t size;
};
class NumpyDataTypeUnicode : public NumpyDataType
{
public:
NumpyDataTypeUnicode(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeIndex getTypeIndex() const override { return NumpyDataTypeIndex::Unicode; }
size_t getSize() const override { return size; }
private:
size_t size;
};

View File

@ -12,8 +12,12 @@
#include <Formats/EscapingRuleUtils.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Columns/ColumnFixedString.h>
#include <Core/TypeId.h>
#include <Core/Types_fwd.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
@ -29,6 +33,7 @@
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <base/types.h>
#include <boost/algorithm/string/split.hpp>
namespace DB
@ -39,13 +44,48 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TYPE;
extern const int ILLEGAL_COLUMN;
}
namespace
{
DataTypePtr createDataType(size_t depth, DataTypePtr nested_type)
DataTypePtr getDataTypeFromNumpyType(const std::shared_ptr<NumpyDataType> & numpy_type)
{
switch (numpy_type->getTypeIndex())
{
case NumpyDataTypeIndex::Int8:
return std::make_shared<DataTypeInt8>();
case NumpyDataTypeIndex::Int16:
return std::make_shared<DataTypeInt16>();
case NumpyDataTypeIndex::Int32:
return std::make_shared<DataTypeInt32>();
case NumpyDataTypeIndex::Int64:
return std::make_shared<DataTypeInt64>();
case NumpyDataTypeIndex::UInt8:
return std::make_shared<DataTypeUInt8>();
case NumpyDataTypeIndex::UInt16:
return std::make_shared<DataTypeUInt16>();
case NumpyDataTypeIndex::UInt32:
return std::make_shared<DataTypeUInt32>();
case NumpyDataTypeIndex::UInt64:
return std::make_shared<DataTypeUInt64>();
case NumpyDataTypeIndex::Float32:
return std::make_shared<DataTypeFloat32>();
case NumpyDataTypeIndex::Float64:
return std::make_shared<DataTypeFloat64>();
case NumpyDataTypeIndex::String:
return std::make_shared<DataTypeString>();
case NumpyDataTypeIndex::Unicode:
return std::make_shared<DataTypeString>();
}
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Numpy type {} is not supported", magic_enum::enum_name(numpy_type->getTypeIndex()));
}
DataTypePtr createNestedArrayType(const DataTypePtr & nested_type, size_t depth)
{
DataTypePtr result_type = nested_type;
assert(depth > 0);
if (depth > 1)
{
@ -55,88 +95,70 @@ DataTypePtr createDataType(size_t depth, DataTypePtr nested_type)
return result_type;
}
/*
Checks, in what endian format data was written.
return -1: if data is written in little-endian;
1: if data is written in big-endian;
0: if data is written in no-endian. */
int endianOrientation(String descr)
size_t parseTypeSize(const std::string type)
{
if (descr.length() < 3)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Descr field length must be bigger or equal 3");
if (descr[0] == '<')
return -1;
else if (descr[0] == '>')
return 1;
else if (descr[0] == '|')
return 0;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong content of field descr");
try
{
size_t size = std::stoi(type);
return size;
}
catch (...)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid data type");
}
}
DataTypePtr parseType(String type)
std::shared_ptr<NumpyDataType> parseType(String type)
{
if (type == "<i1")
return std::make_shared<DataTypeInt8>();
else if (type == "<i2")
return std::make_shared<DataTypeInt16>();
else if (type == "<i4")
return std::make_shared<DataTypeInt32>();
else if (type == "<i8")
return std::make_shared<DataTypeInt64>();
else if (type == "<u1")
return std::make_shared<DataTypeUInt8>();
else if (type == "<u2")
return std::make_shared<DataTypeUInt16>();
else if (type == "<u4")
return std::make_shared<DataTypeUInt32>();
else if (type == "<u8")
return std::make_shared<DataTypeUInt64>();
else if (type == "<f2")
return std::make_shared<DataTypeFloat32>();
else if (type == "<f4")
return std::make_shared<DataTypeFloat32>();
else if (type == "<f8")
return std::make_shared<DataTypeFloat64>();
else if (type == "<c8" || type == "<c16")
/// Parse endianess
NumpyDataType::Endianness endianess;
if (type[0] == '<')
endianess = NumpyDataType::Endianness::LITTLE;
else if (type[1] == '>')
endianess = NumpyDataType::Endianness::BIG;
else if (type[0] == '|')
endianess = NumpyDataType::Endianness::NONE;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong header data");
/// Parse type
if (type[1] == 'i')
return std::make_shared<NumpyDataTypeInt>(endianess, parseTypeSize(type.substr(2)), true);
else if (type[1] == 'b')
return std::make_shared<NumpyDataTypeInt>(endianess, parseTypeSize(type.substr(2)), false);
else if (type[1] == 'u')
return std::make_shared<NumpyDataTypeInt>(endianess, parseTypeSize(type.substr(2)), false);
else if (type[1] == 'f')
return std::make_shared<NumpyDataTypeFloat>(endianess, parseTypeSize(type.substr(2)));
else if (type[1] == 'S')
return std::make_shared<NumpyDataTypeString>(endianess, parseTypeSize(type.substr(2)));
else if (type[1] == 'U')
return std::make_shared<NumpyDataTypeUnicode>(endianess, parseTypeSize(type.substr(2)));
else if (type[1] == 'c')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support complex numeric type");
else if (type == "|b1")
return std::make_shared<DataTypeInt8>();
else if (type[1] == 'U' || type[1] == 'S')
return std::make_shared<DataTypeString>();
else if (type == "O")
else if (type[1] == 'O')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type");
}
std::vector<int> parseShape(String shapeString)
std::vector<int> parseShape(String shape_string)
{
shapeString.erase(std::remove(shapeString.begin(), shapeString.end(), '('), shapeString.end());
shapeString.erase(std::remove(shapeString.begin(), shapeString.end(), ')'), shapeString.end());
// Use a string stream to extract integers
String value;
shape_string.erase(std::remove(shape_string.begin(), shape_string.end(), '('), shape_string.end());
shape_string.erase(std::remove(shape_string.begin(), shape_string.end(), ')'), shape_string.end());
std::vector<std::string> result_str;
boost::split(result_str, shape_string, boost::is_any_of(","));
std::vector<int> shape;
size_t start = 0, end = 0;
while ((end = shapeString.find(',', start)) != std::string::npos)
{
shape.push_back(std::stoi(shapeString.substr(start, end - start)));
start = end + 1;
}
// Add the last token (or the only token if no delimiter is found)
if (start != shapeString.length())
shape.push_back(std::stoi(shapeString.substr(start)));
if (result_str[result_str.size()-1].empty())
result_str.pop_back();
shape.reserve(result_str.size());
for (const String& item : result_str)
shape.push_back(std::stoi(item));
return shape;
}
std::unordered_map<String, String> parseHeader(ReadBuffer &buf)
NumpyHeader parseHeader(ReadBuffer &buf)
{
/// Check magic bytes
const char * magic_string = "\x93NUMPY";
@ -187,7 +209,7 @@ std::unordered_map<String, String> parseHeader(ReadBuffer &buf)
/// Read map key.
String key;
readQuotedField(key, buf);
readQuotedString(key, buf);
assertChar(':', buf);
skipWhitespaceIfAny(buf);
/// Read map value.
@ -196,14 +218,14 @@ std::unordered_map<String, String> parseHeader(ReadBuffer &buf)
assertChar(',', buf);
skipWhitespaceIfAny(buf);
if (key == "'descr'")
if (key == "descr")
descr = value;
else if (key == "'fortran_order'")
else if (key == "fortran_order")
{
if (value != "false")
throw Exception(ErrorCodes::INCORRECT_DATA, "Fortran order is not supported");
}
else if (key == "'shape'")
else if (key == "shape")
shape = value;
}
@ -227,37 +249,38 @@ std::unordered_map<String, String> parseHeader(ReadBuffer &buf)
if (shape[shape.length() - 1] == '\'')
shape = shape.substr(0, shape.length() - 1);
std::unordered_map<String, String> header_data;
header_data["shape"] = shape;
header_data["descr"] = descr;
NumpyHeader res;
res.shape = parseShape(shape);
res.numpy_type = parseType(descr);
return header_data;
return res;
}
int parseStringSize(const std::string type)
DataTypePtr getNestedType(DataTypePtr type)
{
int size;
try
String a = type->getName();
while (const auto * temp_type = typeid_cast<const DataTypeArray *>(type.get()))
{
size = std::stoi(type.substr(2, type.length() - 2));
return size;
}
catch (...)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid data type");
type = temp_type->getNestedType();
a = type->getName();
}
return type;
}
}
void NpyRowInputFormat::readPrefix()
{
header = parseHeader(*in);
}
NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_)
: IRowInputFormat(std::move(header_), in_, std::move(params_))
{
header = parseHeader(*in);
endian = endianOrientation(header["descr"]);
shape = parseShape(header["shape"]);
nestedType = parseType(header["descr"]);
if (isString(nestedType))
sizeForStrings = parseStringSize(header["descr"]);
auto types = getPort().getHeader().getDataTypes();
if (types.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns for Npy input format, expected one column, got {} columns", types.size());
nested_type = getNestedType(types[0]);
}
void NpyRowInputFormat::readRows(MutableColumns & columns)
@ -265,118 +288,144 @@ void NpyRowInputFormat::readRows(MutableColumns & columns)
auto & column = columns[0];
IColumn * current_column = column.get();
size_t elements_in_current_column = 1;
for (size_t i = 1; i != shape.size(); ++i)
for (size_t i = 1; i != header.shape.size(); ++i)
{
auto & array_column = assert_cast<ColumnArray &>(*current_column);
/// Fill offsets of array columns.
for (size_t j = 0; j != elements_in_current_column; ++j)
array_column.getOffsets().push_back(array_column.getOffsets().back() + shape[i]);
array_column.getOffsets().push_back(array_column.getOffsets().back() + header.shape[i]);
current_column = &array_column.getData();
elements_in_current_column *= shape[i];
elements_in_current_column *= header.shape[i];
}
for (size_t i = 0; i != elements_in_current_column; ++i)
readValueAndinsertIntoColumn(current_column->getPtr());
chooseType(current_column);
}
void NpyRowInputFormat::readValueAndinsertIntoColumn(MutableColumnPtr column)
template <typename ColumnValue, typename DataValue>
void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianess)
{
if (auto * column_int8 = typeid_cast<ColumnInt8 *>(column.get()))
DataValue value;
if (endianess == NumpyDataType::Endianness::BIG)
readBinaryBigEndian(value, *in);
else
readBinaryLittleEndian(value, *in);
assert_cast<ColumnVector<ColumnValue> &>(*column).insertValue(static_cast<ColumnValue>(value));
}
void NpyRowInputFormat::readStringBinaryAndInsert(MutableColumnPtr column, size_t size, bool is_fixed)
{
if (is_fixed)
{
Int8 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_int8->insertValue(value);
}
else if (auto * column_int16 = typeid_cast<ColumnInt16 *>(column.get()))
{
Int16 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_int16->insertValue(value);
}
else if (auto * column_int32 = typeid_cast<ColumnInt32 *>(column.get()))
{
Int32 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_int32->insertValue(value);
}
else if (auto * column_int64 = typeid_cast<ColumnInt64 *>(column.get()))
{
Int64 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_int64->insertValue(value);
}
else if (auto * column_uint8 = typeid_cast<ColumnUInt8 *>(column.get()))
{
UInt8 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_uint8->insertValue(value);
}
else if (auto * column_uint16 = typeid_cast<ColumnUInt16 *>(column.get()))
{
UInt16 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_uint16->insertValue(value);
}
else if (auto * column_uint32 = typeid_cast<ColumnUInt32 *>(column.get()))
{
UInt32 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_uint32->insertValue(value);
}
else if (auto * column_uint64 = typeid_cast<ColumnUInt64 *>(column.get()))
{
UInt64 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_uint64->insertValue(value);
}
else if (auto * column_float32 = typeid_cast<ColumnFloat32 *>(column.get()))
{
Float32 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_float32->insertValue(value);
}
else if (auto * column_float64 = typeid_cast<ColumnFloat64 *>(column.get()))
{
Float64 value = 0;
endian == 1 ? readBinaryBigEndian(value, *in) : readBinaryLittleEndian(value, *in);
column_float64->insertValue(value);
}
else if (auto * column_string = typeid_cast<ColumnString *>(column.get()))
{
size_t size = sizeForStrings;
auto & fixed_string_column = assert_cast<ColumnFixedString &>(*column);
size_t n = fixed_string_column.getN();
if (size > n)
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large string for FixedString column");
auto & data = fixed_string_column.getChars();
size_t old_size = data.size();
data.resize_fill(old_size + n);
String tmp;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
fixed_string_column.insertData(tmp.c_str(), tmp.size());
}
else
{
auto & column_string = assert_cast<ColumnString &>(*column);
String tmp;
if (header["descr"][1] == 'U')
size = sizeForStrings * 4;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
column_string->insertData(tmp.c_str(), tmp.size());
column_string.insertData(tmp.c_str(), tmp.size());
}
}
template <typename T>
void NpyRowInputFormat::readAndInsertInteger(IColumn * column, const DataTypePtr & data_type, const NumpyDataType & npy_type)
{
switch (npy_type.getTypeIndex())
{
case NumpyDataTypeIndex::Int8: readBinaryValueAndInsert<T, UInt8>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Int16: readBinaryValueAndInsert<T, UInt16>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Int32: readBinaryValueAndInsert<T, UInt32>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Int64: readBinaryValueAndInsert<T, UInt64>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::UInt8: readBinaryValueAndInsert<T, UInt8>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::UInt16: readBinaryValueAndInsert<T, UInt16>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::UInt32: readBinaryValueAndInsert<T, UInt32>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::UInt64: readBinaryValueAndInsert<T, UInt64>(column->getPtr(), npy_type.getEndianness()); break;
default:
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert data type into column with type {}",
data_type->getName());
}
}
template <typename T>
void NpyRowInputFormat::readAndInsertFloat(IColumn * column, const DataTypePtr & data_type, const NumpyDataType & npy_type)
{
switch (npy_type.getTypeIndex())
{
case NumpyDataTypeIndex::Float32: readBinaryValueAndInsert<T, Float32>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Float64: readBinaryValueAndInsert<T, Float64>(column->getPtr(), npy_type.getEndianness()); break;
default:
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert data type into column with type {}",
data_type->getName());
}
}
template <typename T>
void NpyRowInputFormat::readAndInsertString(MutableColumnPtr column, const DataTypePtr & data_type, const NumpyDataType & npy_type, bool is_fixed)
{
size_t size;
if (npy_type.getTypeIndex() == NumpyDataTypeIndex::String)
size = npy_type.getSize();
else if (npy_type.getTypeIndex() == NumpyDataTypeIndex::Unicode)
size = npy_type.getSize() * 4;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Error while reading data");
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert data type into column with type {}",
data_type->getName());
readStringBinaryAndInsert(column->getPtr(), size, is_fixed);
}
void NpyRowInputFormat::chooseType(IColumn * column)
{
switch (nested_type->getTypeId())
{
case TypeIndex::UInt8: readAndInsertInteger<UInt8>(column, nested_type, *header.numpy_type); break;
case TypeIndex::UInt16: readAndInsertInteger<UInt16>(column, nested_type, *header.numpy_type); break;
case TypeIndex::UInt32: readAndInsertInteger<UInt32>(column, nested_type, *header.numpy_type); break;
case TypeIndex::UInt64: readAndInsertInteger<UInt64>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Int8: readAndInsertInteger<Int8>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Int16: readAndInsertInteger<Int16>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Int32: readAndInsertInteger<Int32>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Int64: readAndInsertInteger<Int64>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Float32: readAndInsertFloat<Float32>(column, nested_type, *header.numpy_type); break;
case TypeIndex::Float64: readAndInsertFloat<Float64>(column, nested_type, *header.numpy_type); break;
case TypeIndex::String: readAndInsertString<String>(column->getPtr(), nested_type, *header.numpy_type, false); break;
case TypeIndex::FixedString: readAndInsertString<String>(column->getPtr(), nested_type, *header.numpy_type, true); break;
default:
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Incorrect type of Npy");
}
}
bool NpyRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & /*ext*/)
{
if (in->eof())
return false;
if (unlikely(*in->position() == '\n'))
{
/// An empty string. It is permissible, but it is unclear why.
++in->position();
}
else
readRows(columns);
return true;
}
void NpyRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
shape.clear();
}
NpySchemaReader::NpySchemaReader(ReadBuffer & in_)
@ -401,11 +450,9 @@ NamesAndTypesList NpySchemaReader::readSchema()
return {};
}
auto header = parseHeader(in);
std::vector<int> shape = parseShape(header["shape"]);
DataTypePtr nested_type = parseType(header["descr"]);
DataTypePtr result_type = createDataType(shape.size(), nested_type);
NumpyHeader header = parseHeader(in);
DataTypePtr nested_type = getDataTypeFromNumpyType(header.numpy_type);
DataTypePtr result_type = createNestedArrayType(nested_type, header.shape.size());
return {{"array", result_type}};
}

View File

@ -7,10 +7,11 @@
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
#include "Columns/IColumn.h"
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Formats/NumpyDataTypes.h>
using NpySizeT = uint32_t;
static const uint8_t NPY_DOCUMENT_END = 0x00;
@ -20,6 +21,12 @@ namespace DB
class ReadBuffer;
struct NumpyHeader
{
std::vector<int> shape;
std::shared_ptr<NumpyDataType> numpy_type;
};
class NpyRowInputFormat final : public IRowInputFormat
{
public:
@ -30,18 +37,31 @@ public:
void resetParser() override;
private:
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
void readData(MutableColumns & columns);
template <typename T>
void readAndInsertInteger(IColumn * column, const DataTypePtr & data_type, const NumpyDataType & npy_type);
template <typename T>
void readAndInsertFloat(IColumn * column, const DataTypePtr & data_type, const NumpyDataType & npy_type);
template <typename T>
void readAndInsertString(MutableColumnPtr column, const DataTypePtr & data_type, const NumpyDataType & npy_type, bool is_fixed);
template <typename ColumnValue, typename DataValue>
void readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianess);
void readStringBinaryAndInsert(MutableColumnPtr column, size_t size, bool is_fixed);
void readRows(MutableColumns & columns);
void readValueAndinsertIntoColumn(MutableColumnPtr column);
void chooseType(IColumn * column);
std::unordered_map<String, String> header;
std::vector<int> shape;
DataTypePtr nestedType;
int endian;
int sizeForStrings = 0;
std::shared_ptr<NumpyDataType> dataType;
DataTypePtr nested_type;
NumpyHeader header;
};
class NpySchemaReader : public ISchemaReader