suggested changes

This commit is contained in:
yariks5s 2023-10-28 01:17:25 +00:00
parent 23635352f1
commit 894724bfb3
4 changed files with 119 additions and 127 deletions

View File

@ -2451,7 +2451,7 @@ Result:
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy type | ClickHouse type |
|:--------:|:---------------:|
| b1 | Int8 |
| b1 | UInt8 |
| i1 | Int8 |
| i2 | Int16 |
| i4 | Int32 |

View File

@ -1,6 +1,12 @@
#pragma once
#include <cstddef>
#include <Storages/NamedCollectionsHelpers.h>
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
enum class NumpyDataTypeIndex
{
Int8,
@ -20,42 +26,46 @@ enum class NumpyDataTypeIndex
class NumpyDataType
{
public:
enum Endianness
enum Endianess
{
LITTLE,
BIG,
NONE,
};
NumpyDataTypeIndex type_index;
explicit NumpyDataType(Endianness endianess_) : endianess(endianess_) {}
explicit NumpyDataType(Endianess endianess_) : endianess(endianess_) {}
virtual ~NumpyDataType() = default;
Endianness getEndianness() const { return endianess; }
Endianess getEndianness() const { return endianess; }
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
virtual size_t getSize() const = 0;
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
private:
Endianness endianess;
Endianess endianess;
};
class NumpyDataTypeInt : public NumpyDataType
{
public:
NumpyDataTypeInt(Endianness endianess, size_t size_, bool is_signed_) : NumpyDataType(endianess), size(size_), is_signed(is_signed_) {}
NumpyDataTypeIndex getTypeIndex() const override
NumpyDataTypeInt(Endianess endianess, size_t size_, bool is_signed_) : NumpyDataType(endianess), size(size_), is_signed(is_signed_)
{
switch (size)
{
case 1: return is_signed ? NumpyDataTypeIndex::Int8 : NumpyDataTypeIndex::UInt8;
case 2: return is_signed ? NumpyDataTypeIndex::Int16 : NumpyDataTypeIndex::UInt16;
case 4: return is_signed ? NumpyDataTypeIndex::Int32 : NumpyDataTypeIndex::UInt32;
case 8: return is_signed ? NumpyDataTypeIndex::Int64 : NumpyDataTypeIndex::UInt64;
case 1: type_index = is_signed ? NumpyDataTypeIndex::Int8 : NumpyDataTypeIndex::UInt8; break;
case 2: type_index = is_signed ? NumpyDataTypeIndex::Int16 : NumpyDataTypeIndex::UInt16; break;
case 4: type_index = is_signed ? NumpyDataTypeIndex::Int32 : NumpyDataTypeIndex::UInt32; break;
case 8: type_index = is_signed ? NumpyDataTypeIndex::Int64 : NumpyDataTypeIndex::UInt64; break;
default:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Incorrect int type with size {}", size);
}
}
NumpyDataTypeIndex getTypeIndex() const override
{
return type_index;
}
size_t getSize() const override { return size; }
bool isSigned() const { return is_signed; }
@ -67,18 +77,21 @@ private:
class NumpyDataTypeFloat : public NumpyDataType
{
public:
NumpyDataTypeFloat(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeIndex getTypeIndex() const override
NumpyDataTypeFloat(Endianess endianess, size_t size_) : NumpyDataType(endianess), size(size_)
{
switch (size)
{
case 4: return NumpyDataTypeIndex::Float32;
case 8: return NumpyDataTypeIndex::Float64;
case 4: type_index = NumpyDataTypeIndex::Float32; break;
case 8: type_index = NumpyDataTypeIndex::Float64; break;
default:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Incorrect float type with size {}", size);
}
}
NumpyDataTypeIndex getTypeIndex() const override
{
return type_index;
}
size_t getSize() const override { return size; }
private:
@ -88,9 +101,12 @@ private:
class NumpyDataTypeString : public NumpyDataType
{
public:
NumpyDataTypeString(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeString(Endianess endianess, size_t size_) : NumpyDataType(endianess), size(size_)
{
type_index = NumpyDataTypeIndex::String;
}
NumpyDataTypeIndex getTypeIndex() const override { return NumpyDataTypeIndex::String; }
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const override { return size; }
private:
size_t size;
@ -99,10 +115,13 @@ private:
class NumpyDataTypeUnicode : public NumpyDataType
{
public:
NumpyDataTypeUnicode(Endianness endianess, size_t size_) : NumpyDataType(endianess), size(size_) {}
NumpyDataTypeUnicode(Endianess endianess, size_t size_) : NumpyDataType(endianess), size(size_)
{
type_index = NumpyDataTypeIndex::Unicode;
}
NumpyDataTypeIndex getTypeIndex() const override { return NumpyDataTypeIndex::Unicode; }
size_t getSize() const override { return size; }
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const override { return size * 4; }
private:
size_t size;
};

View File

@ -15,6 +15,7 @@
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include "Formats/NumpyDataTypes.h"
#include <Columns/ColumnFixedString.h>
#include <Core/TypeId.h>
#include <Core/Types_fwd.h>
@ -34,6 +35,7 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <base/types.h>
#include <boost/algorithm/string/split.hpp>
#include <IO/ReadBufferFromString.h>
namespace DB
@ -43,7 +45,7 @@ namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int TOO_LARGE_STRING_SIZE;
extern const int UNKNOWN_TYPE;
extern const int ILLEGAL_COLUMN;
}
@ -95,29 +97,29 @@ DataTypePtr createNestedArrayType(const DataTypePtr & nested_type, size_t depth)
return result_type;
}
size_t parseTypeSize(const std::string type)
size_t parseTypeSize(const std::string & size_str)
{
try
{
size_t size = std::stoi(type);
size_t size = std::stoi(size_str);
return size;
}
catch (...)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid data type");
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid data type size: {}", size_str);
}
}
std::shared_ptr<NumpyDataType> parseType(String type)
{
/// Parse endianess
NumpyDataType::Endianness endianess;
NumpyDataType::Endianess endianess;
if (type[0] == '<')
endianess = NumpyDataType::Endianness::LITTLE;
endianess = NumpyDataType::Endianess::LITTLE;
else if (type[1] == '>')
endianess = NumpyDataType::Endianness::BIG;
endianess = NumpyDataType::Endianess::BIG;
else if (type[0] == '|')
endianess = NumpyDataType::Endianness::NONE;
endianess = NumpyDataType::Endianess::NONE;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong header data");
@ -139,22 +141,32 @@ std::shared_ptr<NumpyDataType> parseType(String type)
else if (type[1] == 'O')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support numpy type '{}'", type);
}
std::vector<int> parseShape(String shape_string)
{
shape_string.erase(std::remove(shape_string.begin(), shape_string.end(), '('), shape_string.end());
shape_string.erase(std::remove(shape_string.begin(), shape_string.end(), ')'), shape_string.end());
if (!shape_string.starts_with('(') || !shape_string.ends_with(')'))
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect shape format: {}", shape_string);
std::vector<std::string> result_str;
boost::split(result_str, shape_string, boost::is_any_of(","));
boost::split(result_str, std::string_view(shape_string.data() + 1, shape_string.size() - 2), boost::is_any_of(","));
std::vector<int> shape;
if (result_str[result_str.size()-1].empty())
result_str.pop_back();
shape.reserve(result_str.size());
for (const String& item : result_str)
shape.push_back(std::stoi(item));
bool is_first_elem = true;
for (const String & item : result_str)
{
int value;
ReadBufferFromString buf(item);
if (!is_first_elem)
assertString(" ", buf);
if (!tryReadIntText(value, buf))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid shape format: {}", shape_string);
shape.push_back(value);
is_first_elem = false;
}
return shape;
}
@ -258,12 +270,8 @@ NumpyHeader parseHeader(ReadBuffer &buf)
DataTypePtr getNestedType(DataTypePtr type)
{
String a = type->getName();
while (const auto * temp_type = typeid_cast<const DataTypeArray *>(type.get()))
{
type = temp_type->getNestedType();
a = type->getName();
}
return type;
}
@ -283,68 +291,17 @@ NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params par
nested_type = getNestedType(types[0]);
}
void NpyRowInputFormat::readRows(MutableColumns & columns)
{
auto & column = columns[0];
IColumn * current_column = column.get();
size_t elements_in_current_column = 1;
for (size_t i = 1; i != header.shape.size(); ++i)
{
auto & array_column = assert_cast<ColumnArray &>(*current_column);
/// Fill offsets of array columns.
for (size_t j = 0; j != elements_in_current_column; ++j)
array_column.getOffsets().push_back(array_column.getOffsets().back() + header.shape[i]);
current_column = &array_column.getData();
elements_in_current_column *= header.shape[i];
}
for (size_t i = 0; i != elements_in_current_column; ++i)
chooseType(current_column);
}
template <typename ColumnValue, typename DataValue>
void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianess)
void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianess endianess)
{
DataValue value;
if (endianess == NumpyDataType::Endianness::BIG)
if (endianess == NumpyDataType::Endianess::BIG)
readBinaryBigEndian(value, *in);
else
readBinaryLittleEndian(value, *in);
assert_cast<ColumnVector<ColumnValue> &>(*column).insertValue(static_cast<ColumnValue>(value));
}
void NpyRowInputFormat::readStringBinaryAndInsert(MutableColumnPtr column, size_t size, bool is_fixed)
{
if (is_fixed)
{
auto & fixed_string_column = assert_cast<ColumnFixedString &>(*column);
size_t n = fixed_string_column.getN();
if (size > n)
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large string for FixedString column");
auto & data = fixed_string_column.getChars();
size_t old_size = data.size();
data.resize_fill(old_size + n);
String tmp;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
fixed_string_column.insertData(tmp.c_str(), tmp.size());
}
else
{
auto & column_string = assert_cast<ColumnString &>(*column);
String tmp;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
column_string.insertData(tmp.c_str(), tmp.size());
}
}
template <typename T>
void NpyRowInputFormat::readAndInsertInteger(IColumn * column, const DataTypePtr & data_type, const NumpyDataType & npy_type)
{
@ -381,19 +338,42 @@ template <typename T>
void NpyRowInputFormat::readAndInsertString(MutableColumnPtr column, const DataTypePtr & data_type, const NumpyDataType & npy_type, bool is_fixed)
{
size_t size;
if (npy_type.getTypeIndex() == NumpyDataTypeIndex::String)
size = npy_type.getSize();
else if (npy_type.getTypeIndex() == NumpyDataTypeIndex::Unicode)
size = npy_type.getSize() * 4;
size = npy_type.getSize();
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert data type into column with type {}",
data_type->getName());
readStringBinaryAndInsert(column->getPtr(), size, is_fixed);
if (is_fixed)
{
auto & fixed_string_column = assert_cast<ColumnFixedString &>(*column);
size_t n = fixed_string_column.getN();
if (size > n)
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large string for FixedString column");
fixed_string_column.getChars().resize_fill(fixed_string_column.getChars().size() + n);
String tmp;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
fixed_string_column.insertData(tmp.c_str(), tmp.size());
}
else
{
auto & column_string = assert_cast<ColumnString &>(*column);
String tmp;
tmp.resize(size);
in->readStrict(tmp.data(), size);
tmp.erase(std::remove(tmp.begin(), tmp.end(), '\0'), tmp.end());
column_string.insertData(tmp.c_str(), tmp.size());
}
}
void NpyRowInputFormat::chooseType(IColumn * column)
void NpyRowInputFormat::readValue(IColumn * column)
{
switch (nested_type->getTypeId())
{
@ -410,7 +390,7 @@ void NpyRowInputFormat::chooseType(IColumn * column)
case TypeIndex::String: readAndInsertString<String>(column->getPtr(), nested_type, *header.numpy_type, false); break;
case TypeIndex::FixedString: readAndInsertString<String>(column->getPtr(), nested_type, *header.numpy_type, true); break;
default:
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Incorrect type of Npy");
throw Exception(ErrorCodes::UNKNOWN_TYPE, "ClickHouse type {} is not supported for import from Npy format", nested_type->getName());
}
}
@ -418,8 +398,25 @@ bool NpyRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & /*
{
if (in->eof())
return false;
else
readRows(columns);
auto & column = columns[0];
IColumn * current_column = column.get();
size_t elements_in_current_column = 1;
for (size_t i = 1; i != header.shape.size(); ++i)
{
auto * array_column = typeid_cast<ColumnArray *>(current_column);
if (!array_column)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected nesting level of column '{}', expected {}, got {}", column->getName(), header.shape.size() - 1, i - 1);
/// Fill offsets of array columns.
for (size_t j = 0; j != elements_in_current_column; ++j)
array_column->getOffsets().push_back(array_column->getOffsets().back() + header.shape[i]);
current_column = &array_column->getData();
elements_in_current_column *= header.shape[i];
}
for (size_t i = 0; i != elements_in_current_column; ++i)
readValue(current_column);
return true;
}
@ -433,23 +430,6 @@ NpySchemaReader::NpySchemaReader(ReadBuffer & in_)
NamesAndTypesList NpySchemaReader::readSchema()
{
if (first_row)
{
skipBOMIfExists(in);
first_row = false;
}
if (in.eof())
{
return {};
}
if (*in.position() == '\n')
{
++in.position();
return {};
}
NumpyHeader header = parseHeader(in);
DataTypePtr nested_type = getDataTypeFromNumpyType(header.numpy_type);
DataTypePtr result_type = createNestedArrayType(nested_type, header.shape.size());

View File

@ -51,15 +51,12 @@ private:
void readAndInsertString(MutableColumnPtr column, const DataTypePtr & data_type, const NumpyDataType & npy_type, bool is_fixed);
template <typename ColumnValue, typename DataValue>
void readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianess);
void readStringBinaryAndInsert(MutableColumnPtr column, size_t size, bool is_fixed);
void readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianess endianess);
void readRows(MutableColumns & columns);
void chooseType(IColumn * column);
void readValue(IColumn * column);
std::shared_ptr<NumpyDataType> dataType;
DataTypePtr nested_type;
NumpyHeader header;
};
@ -69,12 +66,8 @@ class NpySchemaReader : public ISchemaReader
public:
explicit NpySchemaReader(ReadBuffer & in_);
std::unordered_map<String, String> getHeader();
private:
NamesAndTypesList readSchema() override;
bool first_row = true;
};
}