diff --git a/src/Processors/Formats/Impl/NpyRowInputFormat.cpp b/src/Processors/Formats/Impl/NpyRowInputFormat.cpp index e0876459b20..6fcee686982 100644 --- a/src/Processors/Formats/Impl/NpyRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/NpyRowInputFormat.cpp @@ -14,6 +14,7 @@ #include #include #include "Columns/ColumnArray.h" +#include "Columns/ColumnsNumber.h" #include "Storages/IStorage.h" #include #include @@ -229,211 +230,62 @@ NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params par { header = parseHeader(*in); endian = endianOrientation(header["descr"]); + shape = parseShape(header["shape"]); nestedType = parseType(header["descr"]); } void NpyRowInputFormat::readRows(MutableColumns & columns) { - auto & column = columns[0]; - IColumn * current_column = column.get(); - size_t total_elements_to_read = 1; - for (size_t i = 1; i != shape.size() - 1; ++i) - { - total_elements_to_read *= shape[i]; - auto & array_column = assert_cast(*column); - /// Fill offsets of array columns. - array_column.getOffsets().push_back(shape[i]); - current_column = &array_column.getData(); - } + auto & column = columns[0]; + IColumn * current_column = column.get(); + // size_t total_elements_to_read = 1; + for (size_t i = 1; i != shape.size() - 1; ++i) + { + // total_elements_to_read *= shape[i]; + auto & array_column = assert_cast(*column); + /// Fill offsets of array columns. + array_column.getOffsets().push_back(shape[i]); + current_column = &array_column.getData(); + } - for (int i = 0; i != shape[0]; ++i) + size_t total_elements_to_insert = 1; + for (size_t i = 1; i != shape.size() - 1; i++) + total_elements_to_insert *= shape[i]; + for (size_t i = 0; i != total_elements_to_insert; ++i) + { + readValueAndinsertIntoColumn(current_column->getPtr()); + [[maybe_unused]] size_t size = current_column->size(); + [[maybe_unused]] String str = current_column->dumpStructure(); + } +} + +void NpyRowInputFormat::readValueAndinsertIntoColumn([[maybe_unused]]MutableColumnPtr column) +{ + size_t to_insert = shape[shape.size() - 1]; + if (auto * column_array = typeid_cast(column.get())) { - for (size_t j = 0; j != total_elements_to_read; ++j) - readValueAndinsertIntoColumn(*current_column); - auto a = ColumnArray::create(current_column->getPtr()); - columns.push_back(a->getPtr()); + /// Обновляем оффсет + column_array->getOffsets().push_back(column_array->getOffsets().back() + to_insert); + /// Достаём вложенную колонку + auto nested_column = column_array->getData().getPtr(); + /// Проверяем что это и правда колонка UInt32 + if (auto * column_int64 = typeid_cast(nested_column.get())) + { + // Читаем из данных n значений и вставляем их во вложенную колонку + for (size_t i = 0; i != to_insert; ++i) + { + Int64 value = 0; + readBinaryLittleEndian(value, *in); + column_int64->insertValue(value); + } + } } } -void NpyRowInputFormat::readValueAndinsertIntoColumn(IColumn& column) + +void NpyRowInputFormat::readFromBuffer([[maybe_unused]]MutableColumns & columns) { - if (header["descr"] == "position() != '\n') - ++in->position(); - ++in->position(); - size_t total_size = 1; - for (int dim_size : shape) - total_size *= dim_size; - - for (size_t i = 0; i < total_size; i++) - { - if (in->eof()) - { - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format"); - } - else if (*in->position() == '\t') - { - ++in->position(); - continue; - } - else if (*in->position() == '\n') - { - ++in->position(); - break; - } - - readRows(columns); - } + readRows(columns); } bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowReadExtension & /*ext*/) @@ -441,9 +293,9 @@ bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowRea if (in->eof()) return false; - while (*in->position() != '\n') - ++in->position(); - ++in->position(); + // while (*in->position() != '\n') + // ++in->position(); + // ++in->position(); if (unlikely(*in->position() == '\n')) { @@ -495,7 +347,7 @@ NamesAndTypesList NpySchemaReader::readSchema() void registerInputFormatNpy(FormatFactory & factory) { - factory.registerInputFormat("npy", []( + factory.registerInputFormat("Npy", []( ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, @@ -504,7 +356,7 @@ void registerInputFormatNpy(FormatFactory & factory) return std::make_shared(buf, sample, std::move(params)); }); - factory.markFormatSupportsSubsetOfColumns("npy"); + factory.markFormatSupportsSubsetOfColumns("Npy"); } void registerNpySchemaReader(FormatFactory & factory) { diff --git a/src/Processors/Formats/Impl/NpyRowInputFormat.h b/src/Processors/Formats/Impl/NpyRowInputFormat.h index 6ff8c0dad59..be20c8aba0b 100644 --- a/src/Processors/Formats/Impl/NpyRowInputFormat.h +++ b/src/Processors/Formats/Impl/NpyRowInputFormat.h @@ -37,7 +37,7 @@ private: void readRows(MutableColumns & columns); - void readValueAndinsertIntoColumn(IColumn& column); + void readValueAndinsertIntoColumn(MutableColumnPtr column); std::unordered_map header; std::vector shape;