From 6dc88a4ca4d316177901b03bc064ed3d8f8943dc Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 18 Oct 2023 18:02:05 +0000 Subject: [PATCH] new changes --- .../Formats/Impl/NpyRowInputFormat.cpp | 649 +++++++++++------- .../Formats/Impl/NpyRowInputFormat.h | 53 +- 2 files changed, 415 insertions(+), 287 deletions(-) diff --git a/src/Processors/Formats/Impl/NpyRowInputFormat.cpp b/src/Processors/Formats/Impl/NpyRowInputFormat.cpp index 5dd372c1fde..e0876459b20 100644 --- a/src/Processors/Formats/Impl/NpyRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/NpyRowInputFormat.cpp @@ -1,7 +1,9 @@ #include +#include #include #include #include +#include #include #include #include @@ -10,16 +12,20 @@ #include #include #include -#include "Common/Exception.h" -#include "Columns/IColumn.h" -#include "Core/Field.h" -#include "DataTypes/DataTypesNumber.h" -#include "DataTypes/IDataType.h" -#include "DataTypes/Serializations/ISerialization.h" -#include "IO/ReadBuffer.h" -#include "IO/WriteHelpers.h" -#include "Processors/Formats/IRowInputFormat.h" -#include "base/types.h" +#include +#include "Columns/ColumnArray.h" +#include "Storages/IStorage.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB @@ -34,179 +40,73 @@ namespace ErrorCodes } -NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_) - : IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(getPort().getHeader().columns()) +DataTypePtr createDataType(size_t depth, DataTypePtr nested_type) { - const auto & sample_block = getPort().getHeader(); - size_t num_columns = sample_block.columns(); - for (size_t i = 0; i < num_columns; ++i) - name_map[sample_block.getByPosition(i).name] = i; + DataTypePtr result_type = nested_type; + + assert(depth > 1); + for (size_t i = 0; i < depth - 1; ++i) + result_type = std::make_shared(std::move(result_type)); + return result_type; } -template -void readFromBuffer(ReadBuffer &in, MutableColumns & /*columns*/, std::vector shape) -{ - while (*in.position() != '\n') - ++in.position(); - ++in.position(); - size_t total_size = 1; - for (int dim_size : shape) - total_size *= dim_size; +/* +Checks, in what endian format data was written. +return -1: if data is written in little-endian; - for (size_t i = 0; i < total_size; i++) - { - if (in.eof()) - { - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format"); - } - else if (*in.position() == '\t') - { - ++in.position(); - continue; - } - else if (*in.position() == '\n') - { - ++in.position(); - break; - } - - T value; - readBinaryLittleEndian(value, in); - } + 1: if data is written in big-endian; + + 0: if data is written in no-endian. */ +int endianOrientation(String descr) +{ + if (descr.length() < 3) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Descr field length must be bigger or equal 3"); + if (descr[0] == '<') + return -1; + else if (descr[0] == '>') + return 1; + else if (descr[0] == '|') + return 0; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong content of field descr"); } -template -void readStringFromBuffer(ReadBuffer &in, std::vector shape) -{ - while (*in.position() != '\n') - ++in.position(); - size_t total_size = 1; - for (int dim_size : shape) - total_size *= dim_size; - - for (size_t i = 0; i < total_size; i++) - { - if (in.eof()) - { - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format"); - } - else if (*in.position() == '\t') - { - ++in.position(); - continue; - } - // else if (*in.position() == '\n') - // { - // ++in.position(); - // break; - // } - - T value; - readStringBinary(value, in); - std::cout << value << std::endl; - } -} - -void readAndParseType(String type, ReadBuffer &in, MutableColumns & columns, std::vector shape) //is ok +DataTypePtr parseType(String type) { if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, columns, shape); + return std::make_shared(); else if (type == "(in, shape); + return std::make_shared(); else if (type == "O") throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types"); else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type"); } -bool NpyRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & /*ext*/) -{ - if (in->eof()) - return false; - - while (*in->position() != '\n') - ++in->position(); - ++in->position(); - - if (unlikely(*in->position() == '\n')) - { - /// An empty string. It is permissible, but it is unclear why. - ++in->position(); - } - else - readAndParseType(header["descr"], *in, columns, shape); - - return true; -} - -void NpyRowInputFormat::syncAfterError() -{ - skipToUnescapedNextLineOrEOF(*in); -} - -void NpyRowInputFormat::resetParser() -{ - IRowInputFormat::resetParser(); - read_columns.clear(); - seen_columns.clear(); - name_buf.clear(); -} - -size_t NpyRowInputFormat::countRows(size_t max_block_size) -{ - size_t num_rows = 0; - while (!in->eof() && num_rows < max_block_size) - { - skipToUnescapedNextLineOrEOF(*in); - ++num_rows; - } - - return num_rows; -} - -NpySchemaReader::NpySchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) - : IRowWithNamesSchemaReader(in_, format_settings_, getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule::Escaped)) -{ -} - -[[maybe_unused]]static size_t readNpySize(ReadBuffer & in) -{ - NpySizeT size; - readBinaryLittleEndian(size, in); - return size; -} - -[[maybe_unused]]static String readNpyHeader(ReadBuffer & in) -{ - String header; - readBinary(header, in); - return header; -} - std::vector parseShape(String shapeString) { shapeString.erase(std::remove(shapeString.begin(), shapeString.end(), '('), shapeString.end()); @@ -226,39 +126,346 @@ std::vector parseShape(String shapeString) return shape; } -void NpyRowInputFormat::readPrefix() +std::unordered_map parseHeader(ReadBuffer &buf) { - const char * begin_pos = find_first_symbols<'\''>(in->position(), in->buffer().end()); - String text(begin_pos); - std::unordered_map header_map; + /// Check magic bytes + const char * magic_string = "\x93NUMPY"; + assertString(magic_string, buf); - // Finding fortran_order - size_t loc1 = text.find("fortran_order"); - if (loc1 == std::string::npos) - throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword 'fortran_order'"); - header_map["fortran_order"] = (text.substr(loc1+16, 4) == "True" ? "true" : "false"); + /// Read npy version. + UInt8 version_major; + UInt8 version_minor; + readBinary(version_major, buf); + readBinary(version_minor, buf); - // Finding shape - loc1 = text.find('('); - size_t loc2 = text.find(')'); - if (loc1 == std::string::npos || loc2 == std::string::npos) - throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword '(' or ')'"); - header_map["shape"] = text.substr(loc1, loc2 - loc1 + 1); + /// Read header length. + UInt32 header_length; + /// In v1 header length is 2 bytes, in v2 - 4 bytes. + if (version_major == 1) + { + UInt16 header_length_u16; + readBinaryLittleEndian(header_length_u16, buf); + header_length = header_length_u16; + } + else + { + readBinaryLittleEndian(header_length, buf); + } - // Finding descr - loc1 = text.find("descr"); - loc2 = loc1 + 9; - while (text[loc2] != '\'') - loc2++; - if (loc1 == std::string::npos) - throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword 'descr'"); - header_map["descr"] = (text.substr(loc1+9, loc2 - loc1 - 9)); + /// Remember current count of read bytes to skip remaining + /// bytes in header when we find all required fields. + size_t header_start = buf.count(); - header = header_map; - shape = parseShape(header_map["shape"]); + /// Start parsing header. + String shape; + String descr; + + assertChar('{', buf); + skipWhitespaceIfAny(buf); + bool first = true; + while (!checkChar('}', buf)) + { + /// Skip delimiter between key-value pairs. + if (!first) + { + skipWhitespaceIfAny(buf); + } + else + { + first = false; + } + + /// Read map key. + String key; + readQuotedField(key, buf); + assertChar(':', buf); + skipWhitespaceIfAny(buf); + /// Read map value. + String value; + readQuotedField(value, buf); + assertChar(',', buf); + skipWhitespaceIfAny(buf); + + if (key == "'descr'") + descr = value; + else if (key == "'fortran_order'") + { + if (value != "false") + throw Exception(ErrorCodes::INCORRECT_DATA, "Fortran order is not supported"); + } + else if (key == "'shape'") + shape = value; + } + + if (shape.empty() || descr.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "npy file header doesn't contain required field 'shape' or 'descr'"); + + size_t read_bytes = buf.count() - header_start; + if (read_bytes > header_length) + throw Exception(ErrorCodes::INCORRECT_DATA, "Header size is incorrect"); + + /// Ignore remaining header data. + buf.ignore(header_length - read_bytes); + + if (descr[0] == '\'') + descr = descr.substr(1, descr.length() - 1); + if (descr[descr.length() - 1] == '\'') + descr = descr.substr(0, descr.length() - 1); + + if (shape[0] == '\'') + shape = shape.substr(1, shape.length() - 1); + if (shape[shape.length() - 1] == '\'') + shape = shape.substr(0, shape.length() - 1); + + std::unordered_map header_data; + header_data["shape"] = shape; + header_data["descr"] = descr; + + return header_data; } -NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof) +NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_) + : IRowInputFormat(std::move(header_), in_, std::move(params_)) +{ + header = parseHeader(*in); + endian = endianOrientation(header["descr"]); + nestedType = parseType(header["descr"]); +} + +void NpyRowInputFormat::readRows(MutableColumns & columns) +{ + auto & column = columns[0]; + IColumn * current_column = column.get(); + size_t total_elements_to_read = 1; + for (size_t i = 1; i != shape.size() - 1; ++i) + { + total_elements_to_read *= shape[i]; + auto & array_column = assert_cast(*column); + /// Fill offsets of array columns. + array_column.getOffsets().push_back(shape[i]); + current_column = &array_column.getData(); + } + + for (int i = 0; i != shape[0]; ++i) + { + for (size_t j = 0; j != total_elements_to_read; ++j) + readValueAndinsertIntoColumn(*current_column); + auto a = ColumnArray::create(current_column->getPtr()); + columns.push_back(a->getPtr()); + } +} + +void NpyRowInputFormat::readValueAndinsertIntoColumn(IColumn& column) +{ + if (header["descr"] == "position() != '\n') + ++in->position(); + ++in->position(); + size_t total_size = 1; + for (int dim_size : shape) + total_size *= dim_size; + + for (size_t i = 0; i < total_size; i++) + { + if (in->eof()) + { + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format"); + } + else if (*in->position() == '\t') + { + ++in->position(); + continue; + } + else if (*in->position() == '\n') + { + ++in->position(); + break; + } + + readRows(columns); + } +} + +bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowReadExtension & /*ext*/) +{ + if (in->eof()) + return false; + + while (*in->position() != '\n') + ++in->position(); + ++in->position(); + + if (unlikely(*in->position() == '\n')) + { + /// An empty string. It is permissible, but it is unclear why. + ++in->position(); + } + else + readFromBuffer(columns); + + return true; +} + +void NpyRowInputFormat::resetParser() +{ + IRowInputFormat::resetParser(); + shape.clear(); +} + +NpySchemaReader::NpySchemaReader(ReadBuffer & in_) + : ISchemaReader(in_) {} + +NamesAndTypesList NpySchemaReader::readSchema() { if (first_row) { @@ -268,7 +475,6 @@ NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof) if (in.eof()) { - eof = true; return {}; } @@ -278,90 +484,33 @@ NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof) return {}; } - return {}; + auto header = parseHeader(in); + std::vector shape = parseShape(header["shape"]); + DataTypePtr nested_type = parseType(header["descr"]); + + DataTypePtr result_type = createDataType(shape.size(), nested_type); + + return {{"array", result_type}}; } -size_t nthSubstr(int n, const String& s, - const String& p) -{ - String::size_type i = s.find(p); // Find the first occurrence - - int j; - for (j = 1; j < n && i != String::npos; ++j) - i = s.find(p, i+1); // Find the next occurrence - - if (j == n) - return(i); - else - return(-1); -} - -// String NpySchemaReader::readHeader(bool & eof) -// { -// if (first_row) -// { -// skipBOMIfExists(in); -// first_row = false; -// } - -// if (in.eof()) -// { -// eof = true; -// return {}; -// } - -// if (*in.position() == '\n') -// { -// ++in.position(); -// return {}; -// } - -// // NamesAndTypesList names_and_types; -// StringRef name_ref; -// String name_buf; -// // readName(in, name_ref, name_buf); -// String text = String(name_ref); -// String res; - -// size_t pos = text.find('{'); -// std::map header; -// if (pos != String::npos) { -// // Find the closing curly brace. -// size_t end = text.find('}', pos + 1); -// if (end != String::npos) -// { -// // Get the text in curly braces. -// res = text.substr(pos + 1, end - pos - 1); - -// // Print the text in curly braces. -// } -// header["descr"] = res.substr(nthSubstr(1, res, "'descr':")+10, nthSubstr(1, res, "',")-2); -// } -// return text; -// } - void registerInputFormatNpy(FormatFactory & factory) { factory.registerInputFormat("npy", []( ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, - const FormatSettings & settings) + const FormatSettings &) { - return std::make_shared(buf, sample, std::move(params), settings); + return std::make_shared(buf, sample, std::move(params)); }); factory.markFormatSupportsSubsetOfColumns("npy"); } void registerNpySchemaReader(FormatFactory & factory) { - factory.registerSchemaReader("Npy", [](ReadBuffer & buf, const FormatSettings & settings) + factory.registerSchemaReader("Npy", [](ReadBuffer & buf, const FormatSettings &) { - return std::make_shared(buf, settings); - }); - factory.registerAdditionalInfoForSchemaCacheGetter("npy", [](const FormatSettings & settings) - { - return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::Escaped); + return std::make_shared(buf); }); } diff --git a/src/Processors/Formats/Impl/NpyRowInputFormat.h b/src/Processors/Formats/Impl/NpyRowInputFormat.h index ea9673cd0db..6ff8c0dad59 100644 --- a/src/Processors/Formats/Impl/NpyRowInputFormat.h +++ b/src/Processors/Formats/Impl/NpyRowInputFormat.h @@ -1,14 +1,16 @@ #pragma once #include +#include #include #include #include #include #include -#include "Core/Field.h" -#include "Core/NamesAndTypes.h" -#include "Core/Types.h" +#include "Columns/IColumn.h" +#include +#include +#include using NpySizeT = uint32_t; static const uint8_t NPY_DOCUMENT_END = 0x00; @@ -18,63 +20,40 @@ namespace DB class ReadBuffer; - -/** Stream for reading data in TSKV format. - * TSKV is a very inefficient data format. - * Similar to TSV, but each field is written as key=value. - * Fields can be listed in any order (including, in different lines there may be different order), - * and some fields may be missing. - * An equal sign can be escaped in the field name. - * Also, as an additional element there may be a useless tskv fragment - it needs to be ignored. - */ class NpyRowInputFormat final : public IRowInputFormat { public: - NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_); + NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_); String getName() const override { return "NpyRowInputFormat"; } + void readFromBuffer(MutableColumns & /*columns*/); + void resetParser() override; private: - void readPrefix() override; bool readRow(MutableColumns & columns, RowReadExtension &) override; - bool allowSyncAfterError() const override { return true; } - void syncAfterError() override; + void readData(MutableColumns & columns); - bool supportsCountRows() const override { return true; } - size_t countRows(size_t max_block_size) override; + void readRows(MutableColumns & columns); - const FormatSettings format_settings; - - /// Buffer for the read from the stream the field name. Used when you have to copy it. - String name_buf; - - /// Hash table matching `field name -> position in the block`. NOTE You can use perfect hash map. - using NameMap = HashMap; - NameMap name_map; + void readValueAndinsertIntoColumn(IColumn& column); std::unordered_map header; - DataTypePtr data_type; std::vector shape; - - /// Set of columns for which the values were read. The rest will be filled with default values. - std::vector read_columns; - /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. - std::vector seen_columns; - /// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true - /// for row like ..., non-nullable column name=\N, ... + DataTypePtr nestedType; + int endian; }; -class NpySchemaReader : public IRowWithNamesSchemaReader +class NpySchemaReader : public ISchemaReader { public: - NpySchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_); + explicit NpySchemaReader(ReadBuffer & in_); std::unordered_map getHeader(); private: - NamesAndTypesList readRowAndGetNamesAndDataTypes([[maybe_unused]]bool & eof) override; + NamesAndTypesList readSchema() override; // NamesAndTypesList getDataTypesFromNpyDocument([[maybe_unused]]bool allow_to_skip_unsupported_types); // String readHeader(bool & eof);