dealt with 2dim arrays

This commit is contained in:
yariks5s 2023-10-20 17:05:05 +00:00
parent 6dc88a4ca4
commit 87f26f5132
2 changed files with 52 additions and 200 deletions

View File

@ -14,6 +14,7 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include "Columns/ColumnArray.h" #include "Columns/ColumnArray.h"
#include "Columns/ColumnsNumber.h"
#include "Storages/IStorage.h" #include "Storages/IStorage.h"
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Core/Field.h> #include <Core/Field.h>
@ -229,211 +230,62 @@ NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params par
{ {
header = parseHeader(*in); header = parseHeader(*in);
endian = endianOrientation(header["descr"]); endian = endianOrientation(header["descr"]);
shape = parseShape(header["shape"]);
nestedType = parseType(header["descr"]); nestedType = parseType(header["descr"]);
} }
void NpyRowInputFormat::readRows(MutableColumns & columns) void NpyRowInputFormat::readRows(MutableColumns & columns)
{ {
auto & column = columns[0]; auto & column = columns[0];
IColumn * current_column = column.get(); IColumn * current_column = column.get();
size_t total_elements_to_read = 1; // size_t total_elements_to_read = 1;
for (size_t i = 1; i != shape.size() - 1; ++i) for (size_t i = 1; i != shape.size() - 1; ++i)
{ {
total_elements_to_read *= shape[i]; // total_elements_to_read *= shape[i];
auto & array_column = assert_cast<ColumnArray &>(*column); auto & array_column = assert_cast<ColumnArray &>(*column);
/// Fill offsets of array columns. /// Fill offsets of array columns.
array_column.getOffsets().push_back(shape[i]); array_column.getOffsets().push_back(shape[i]);
current_column = &array_column.getData(); current_column = &array_column.getData();
} }
for (int i = 0; i != shape[0]; ++i) size_t total_elements_to_insert = 1;
for (size_t i = 1; i != shape.size() - 1; i++)
total_elements_to_insert *= shape[i];
for (size_t i = 0; i != total_elements_to_insert; ++i)
{
readValueAndinsertIntoColumn(current_column->getPtr());
[[maybe_unused]] size_t size = current_column->size();
[[maybe_unused]] String str = current_column->dumpStructure();
}
}
void NpyRowInputFormat::readValueAndinsertIntoColumn([[maybe_unused]]MutableColumnPtr column)
{
size_t to_insert = shape[shape.size() - 1];
if (auto * column_array = typeid_cast<ColumnArray *>(column.get()))
{ {
for (size_t j = 0; j != total_elements_to_read; ++j) /// Обновляем оффсет
readValueAndinsertIntoColumn(*current_column); column_array->getOffsets().push_back(column_array->getOffsets().back() + to_insert);
auto a = ColumnArray::create(current_column->getPtr()); /// Достаём вложенную колонку
columns.push_back(a->getPtr()); auto nested_column = column_array->getData().getPtr();
/// Проверяем что это и правда колонка UInt32
if (auto * column_int64 = typeid_cast<ColumnInt64 *>(nested_column.get()))
{
// Читаем из данных n значений и вставляем их во вложенную колонку
for (size_t i = 0; i != to_insert; ++i)
{
Int64 value = 0;
readBinaryLittleEndian(value, *in);
column_int64->insertValue(value);
}
}
} }
} }
void NpyRowInputFormat::readValueAndinsertIntoColumn(IColumn& column)
void NpyRowInputFormat::readFromBuffer([[maybe_unused]]MutableColumns & columns)
{ {
if (header["descr"] == "<i1") readRows(columns);
{
DataTypeInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i2")
{
DataTypeInt16 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i4")
{
DataTypeInt32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i8")
{
DataTypeInt64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u1")
{
DataTypeUInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u2")
{
DataTypeUInt16 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u4")
{
DataTypeUInt32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u8")
{
DataTypeUInt64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<f2")
{
DataTypeFloat32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<f4")
{
DataTypeFloat32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
} /// we dont support size of one of floats here.
else if (header["descr"] == "<f8")
{
DataTypeFloat64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<c8" || header["descr"] == "<c16")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support complex numeric type");
else if (header["descr"] == "|b1")
{
DataTypeInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
} /// Not sure that its good idea
else if (header["descr"] == "<U10" || header["descr"] == "<U20" || header["descr"] == "<U21")
{
String value;
if (endian == -1)
readStringBinary(value, *in);
column.insert(value);
}
else if (header["descr"] == "O")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type");
}
void NpyRowInputFormat::readFromBuffer(MutableColumns & columns)
{
while (*in->position() != '\n')
++in->position();
++in->position();
size_t total_size = 1;
for (int dim_size : shape)
total_size *= dim_size;
for (size_t i = 0; i < total_size; i++)
{
if (in->eof())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format");
}
else if (*in->position() == '\t')
{
++in->position();
continue;
}
else if (*in->position() == '\n')
{
++in->position();
break;
}
readRows(columns);
}
} }
bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowReadExtension & /*ext*/) bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowReadExtension & /*ext*/)
@ -441,9 +293,9 @@ bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowRea
if (in->eof()) if (in->eof())
return false; return false;
while (*in->position() != '\n') // while (*in->position() != '\n')
++in->position(); // ++in->position();
++in->position(); // ++in->position();
if (unlikely(*in->position() == '\n')) if (unlikely(*in->position() == '\n'))
{ {
@ -495,7 +347,7 @@ NamesAndTypesList NpySchemaReader::readSchema()
void registerInputFormatNpy(FormatFactory & factory) void registerInputFormatNpy(FormatFactory & factory)
{ {
factory.registerInputFormat("npy", []( factory.registerInputFormat("Npy", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -504,7 +356,7 @@ void registerInputFormatNpy(FormatFactory & factory)
return std::make_shared<NpyRowInputFormat>(buf, sample, std::move(params)); return std::make_shared<NpyRowInputFormat>(buf, sample, std::move(params));
}); });
factory.markFormatSupportsSubsetOfColumns("npy"); factory.markFormatSupportsSubsetOfColumns("Npy");
} }
void registerNpySchemaReader(FormatFactory & factory) void registerNpySchemaReader(FormatFactory & factory)
{ {

View File

@ -37,7 +37,7 @@ private:
void readRows(MutableColumns & columns); void readRows(MutableColumns & columns);
void readValueAndinsertIntoColumn(IColumn& column); void readValueAndinsertIntoColumn(MutableColumnPtr column);
std::unordered_map<String, String> header; std::unordered_map<String, String> header;
std::vector<int> shape; std::vector<int> shape;