new changes

This commit is contained in:
yariks5s 2023-10-18 18:02:05 +00:00
parent cb08da617f
commit 6dc88a4ca4
2 changed files with 415 additions and 287 deletions

View File

@ -1,7 +1,9 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <cstddef>
#include <iterator> #include <iterator>
#include <memory> #include <memory>
#include <string> #include <string>
#include <tuple>
#include <vector> #include <vector>
#include <type_traits> #include <type_traits>
#include <unordered_map> #include <unordered_map>
@ -10,16 +12,20 @@
#include <Formats/EscapingRuleUtils.h> #include <Formats/EscapingRuleUtils.h>
#include <DataTypes/Serializations/SerializationNullable.h> #include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include "Common/Exception.h" #include <Common/Exception.h>
#include "Columns/IColumn.h" #include "Columns/ColumnArray.h"
#include "Core/Field.h" #include "Storages/IStorage.h"
#include "DataTypes/DataTypesNumber.h" #include <Columns/IColumn.h>
#include "DataTypes/IDataType.h" #include <Core/Field.h>
#include "DataTypes/Serializations/ISerialization.h" #include <Core/NamesAndTypes.h>
#include "IO/ReadBuffer.h" #include <DataTypes/DataTypeArray.h>
#include "IO/WriteHelpers.h" #include <DataTypes/DataTypesNumber.h>
#include "Processors/Formats/IRowInputFormat.h" #include <DataTypes/IDataType.h>
#include "base/types.h" #include <DataTypes/Serializations/ISerialization.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <base/types.h>
namespace DB namespace DB
@ -34,179 +40,73 @@ namespace ErrorCodes
} }
NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_) DataTypePtr createDataType(size_t depth, DataTypePtr nested_type)
: IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(getPort().getHeader().columns())
{ {
const auto & sample_block = getPort().getHeader(); DataTypePtr result_type = nested_type;
size_t num_columns = sample_block.columns();
for (size_t i = 0; i < num_columns; ++i) assert(depth > 1);
name_map[sample_block.getByPosition(i).name] = i; for (size_t i = 0; i < depth - 1; ++i)
result_type = std::make_shared<DataTypeArray>(std::move(result_type));
return result_type;
} }
template <typename T> /*
void readFromBuffer(ReadBuffer &in, MutableColumns & /*columns*/, std::vector<int> shape) Checks, in what endian format data was written.
{ return -1: if data is written in little-endian;
while (*in.position() != '\n')
++in.position();
++in.position();
size_t total_size = 1;
for (int dim_size : shape)
total_size *= dim_size;
for (size_t i = 0; i < total_size; i++) 1: if data is written in big-endian;
{
if (in.eof()) 0: if data is written in no-endian. */
{ int endianOrientation(String descr)
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format"); {
} if (descr.length() < 3)
else if (*in.position() == '\t') throw Exception(ErrorCodes::BAD_ARGUMENTS, "Descr field length must be bigger or equal 3");
{ if (descr[0] == '<')
++in.position(); return -1;
continue; else if (descr[0] == '>')
} return 1;
else if (*in.position() == '\n') else if (descr[0] == '|')
{ return 0;
++in.position(); else
break; throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong content of field descr");
}
T value;
readBinaryLittleEndian(value, in);
}
} }
template <typename T> DataTypePtr parseType(String type)
void readStringFromBuffer(ReadBuffer &in, std::vector<int> shape)
{
while (*in.position() != '\n')
++in.position();
size_t total_size = 1;
for (int dim_size : shape)
total_size *= dim_size;
for (size_t i = 0; i < total_size; i++)
{
if (in.eof())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format");
}
else if (*in.position() == '\t')
{
++in.position();
continue;
}
// else if (*in.position() == '\n')
// {
// ++in.position();
// break;
// }
T value;
readStringBinary(value, in);
std::cout << value << std::endl;
}
}
void readAndParseType(String type, ReadBuffer &in, MutableColumns & columns, std::vector<int> shape) //is ok
{ {
if (type == "<i1") if (type == "<i1")
readFromBuffer<Int8>(in, columns, shape); return std::make_shared<DataTypeInt8>();
else if (type == "<i2") else if (type == "<i2")
readFromBuffer<Int16>(in, columns, shape); return std::make_shared<DataTypeInt16>();
else if (type == "<i4") else if (type == "<i4")
readFromBuffer<Int32>(in, columns, shape); return std::make_shared<DataTypeInt32>();
else if (type == "<i8") else if (type == "<i8")
readFromBuffer<Int64>(in, columns, shape); return std::make_shared<DataTypeInt64>();
else if (type == "<u1") else if (type == "<u1")
readFromBuffer<UInt8>(in, columns, shape); return std::make_shared<DataTypeUInt8>();
else if (type == "<u2") else if (type == "<u2")
readFromBuffer<UInt16>(in, columns, shape); return std::make_shared<DataTypeUInt16>();
else if (type == "<u4") else if (type == "<u4")
readFromBuffer<UInt32>(in, columns, shape); return std::make_shared<DataTypeUInt32>();
else if (type == "<u8") else if (type == "<u8")
readFromBuffer<UInt64>(in, columns, shape); return std::make_shared<DataTypeUInt64>();
else if (type == "<f2") else if (type == "<f2")
readFromBuffer<Float32>(in, columns, shape); return std::make_shared<DataTypeFloat32>();
else if (type == "<f4") else if (type == "<f4")
readFromBuffer<Float32>(in, columns, shape); return std::make_shared<DataTypeFloat32>();
else if (type == "<f8") else if (type == "<f8")
readFromBuffer<Float64>(in, columns, shape); return std::make_shared<DataTypeFloat64>();
else if (type == "<c8" || type == "<c16") else if (type == "<c8" || type == "<c16")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support complex numeric type"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support complex numeric type");
else if (type == "|b1") else if (type == "|b1")
readFromBuffer<Int8>(in, columns, shape); return std::make_shared<DataTypeInt8>();
else if (type == "<U10" || type == "<U20" || type == "<U21") else if (type == "<U10" || type == "<U20" || type == "<U21")
readStringFromBuffer<String>(in, shape); return std::make_shared<DataTypeString>();
else if (type == "O") else if (type == "O")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types");
else else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type");
} }
bool NpyRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & /*ext*/)
{
if (in->eof())
return false;
while (*in->position() != '\n')
++in->position();
++in->position();
if (unlikely(*in->position() == '\n'))
{
/// An empty string. It is permissible, but it is unclear why.
++in->position();
}
else
readAndParseType(header["descr"], *in, columns, shape);
return true;
}
void NpyRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(*in);
}
void NpyRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
read_columns.clear();
seen_columns.clear();
name_buf.clear();
}
size_t NpyRowInputFormat::countRows(size_t max_block_size)
{
size_t num_rows = 0;
while (!in->eof() && num_rows < max_block_size)
{
skipToUnescapedNextLineOrEOF(*in);
++num_rows;
}
return num_rows;
}
NpySchemaReader::NpySchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowWithNamesSchemaReader(in_, format_settings_, getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule::Escaped))
{
}
[[maybe_unused]]static size_t readNpySize(ReadBuffer & in)
{
NpySizeT size;
readBinaryLittleEndian(size, in);
return size;
}
[[maybe_unused]]static String readNpyHeader(ReadBuffer & in)
{
String header;
readBinary(header, in);
return header;
}
std::vector<int> parseShape(String shapeString) std::vector<int> parseShape(String shapeString)
{ {
shapeString.erase(std::remove(shapeString.begin(), shapeString.end(), '('), shapeString.end()); shapeString.erase(std::remove(shapeString.begin(), shapeString.end(), '('), shapeString.end());
@ -226,39 +126,346 @@ std::vector<int> parseShape(String shapeString)
return shape; return shape;
} }
void NpyRowInputFormat::readPrefix() std::unordered_map<String, String> parseHeader(ReadBuffer &buf)
{ {
const char * begin_pos = find_first_symbols<'\''>(in->position(), in->buffer().end()); /// Check magic bytes
String text(begin_pos); const char * magic_string = "\x93NUMPY";
std::unordered_map<String, String> header_map; assertString(magic_string, buf);
// Finding fortran_order /// Read npy version.
size_t loc1 = text.find("fortran_order"); UInt8 version_major;
if (loc1 == std::string::npos) UInt8 version_minor;
throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword 'fortran_order'"); readBinary(version_major, buf);
header_map["fortran_order"] = (text.substr(loc1+16, 4) == "True" ? "true" : "false"); readBinary(version_minor, buf);
// Finding shape /// Read header length.
loc1 = text.find('('); UInt32 header_length;
size_t loc2 = text.find(')'); /// In v1 header length is 2 bytes, in v2 - 4 bytes.
if (loc1 == std::string::npos || loc2 == std::string::npos) if (version_major == 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword '(' or ')'"); {
header_map["shape"] = text.substr(loc1, loc2 - loc1 + 1); UInt16 header_length_u16;
readBinaryLittleEndian(header_length_u16, buf);
header_length = header_length_u16;
}
else
{
readBinaryLittleEndian(header_length, buf);
}
// Finding descr /// Remember current count of read bytes to skip remaining
loc1 = text.find("descr"); /// bytes in header when we find all required fields.
loc2 = loc1 + 9; size_t header_start = buf.count();
while (text[loc2] != '\'')
loc2++;
if (loc1 == std::string::npos)
throw Exception(ErrorCodes::INCORRECT_DATA, "failed to find header keyword 'descr'");
header_map["descr"] = (text.substr(loc1+9, loc2 - loc1 - 9));
header = header_map; /// Start parsing header.
shape = parseShape(header_map["shape"]); String shape;
String descr;
assertChar('{', buf);
skipWhitespaceIfAny(buf);
bool first = true;
while (!checkChar('}', buf))
{
/// Skip delimiter between key-value pairs.
if (!first)
{
skipWhitespaceIfAny(buf);
}
else
{
first = false;
}
/// Read map key.
String key;
readQuotedField(key, buf);
assertChar(':', buf);
skipWhitespaceIfAny(buf);
/// Read map value.
String value;
readQuotedField(value, buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
if (key == "'descr'")
descr = value;
else if (key == "'fortran_order'")
{
if (value != "false")
throw Exception(ErrorCodes::INCORRECT_DATA, "Fortran order is not supported");
}
else if (key == "'shape'")
shape = value;
}
if (shape.empty() || descr.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "npy file header doesn't contain required field 'shape' or 'descr'");
size_t read_bytes = buf.count() - header_start;
if (read_bytes > header_length)
throw Exception(ErrorCodes::INCORRECT_DATA, "Header size is incorrect");
/// Ignore remaining header data.
buf.ignore(header_length - read_bytes);
if (descr[0] == '\'')
descr = descr.substr(1, descr.length() - 1);
if (descr[descr.length() - 1] == '\'')
descr = descr.substr(0, descr.length() - 1);
if (shape[0] == '\'')
shape = shape.substr(1, shape.length() - 1);
if (shape[shape.length() - 1] == '\'')
shape = shape.substr(0, shape.length() - 1);
std::unordered_map<String, String> header_data;
header_data["shape"] = shape;
header_data["descr"] = descr;
return header_data;
} }
NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof) NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_)
: IRowInputFormat(std::move(header_), in_, std::move(params_))
{
header = parseHeader(*in);
endian = endianOrientation(header["descr"]);
nestedType = parseType(header["descr"]);
}
void NpyRowInputFormat::readRows(MutableColumns & columns)
{
auto & column = columns[0];
IColumn * current_column = column.get();
size_t total_elements_to_read = 1;
for (size_t i = 1; i != shape.size() - 1; ++i)
{
total_elements_to_read *= shape[i];
auto & array_column = assert_cast<ColumnArray &>(*column);
/// Fill offsets of array columns.
array_column.getOffsets().push_back(shape[i]);
current_column = &array_column.getData();
}
for (int i = 0; i != shape[0]; ++i)
{
for (size_t j = 0; j != total_elements_to_read; ++j)
readValueAndinsertIntoColumn(*current_column);
auto a = ColumnArray::create(current_column->getPtr());
columns.push_back(a->getPtr());
}
}
void NpyRowInputFormat::readValueAndinsertIntoColumn(IColumn& column)
{
if (header["descr"] == "<i1")
{
DataTypeInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i2")
{
DataTypeInt16 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i4")
{
DataTypeInt32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<i8")
{
DataTypeInt64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u1")
{
DataTypeUInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u2")
{
DataTypeUInt16 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u4")
{
DataTypeUInt32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<u8")
{
DataTypeUInt64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<f2")
{
DataTypeFloat32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<f4")
{
DataTypeFloat32 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
} /// we dont support size of one of floats here.
else if (header["descr"] == "<f8")
{
DataTypeFloat64 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
}
else if (header["descr"] == "<c8" || header["descr"] == "<c16")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support complex numeric type");
else if (header["descr"] == "|b1")
{
DataTypeInt8 value;
if (endian == -1)
readBinaryLittleEndian(value, *in);
else if (endian == 1)
readBinaryBigEndian(value, *in);
// else if (endian == 0)
// readBinary(value, *in);
column.insertData(value);
} /// Not sure that its good idea
else if (header["descr"] == "<U10" || header["descr"] == "<U20" || header["descr"] == "<U21")
{
String value;
if (endian == -1)
readStringBinary(value, *in);
column.insert(value);
}
else if (header["descr"] == "O")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouse doesn't support object types");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing data type");
}
void NpyRowInputFormat::readFromBuffer(MutableColumns & columns)
{
while (*in->position() != '\n')
++in->position();
++in->position();
size_t total_size = 1;
for (int dim_size : shape)
total_size *= dim_size;
for (size_t i = 0; i < total_size; i++)
{
if (in->eof())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream in Npy format");
}
else if (*in->position() == '\t')
{
++in->position();
continue;
}
else if (*in->position() == '\n')
{
++in->position();
break;
}
readRows(columns);
}
}
bool NpyRowInputFormat::readRow([[maybe_unused]]MutableColumns & columns, RowReadExtension & /*ext*/)
{
if (in->eof())
return false;
while (*in->position() != '\n')
++in->position();
++in->position();
if (unlikely(*in->position() == '\n'))
{
/// An empty string. It is permissible, but it is unclear why.
++in->position();
}
else
readFromBuffer(columns);
return true;
}
void NpyRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
shape.clear();
}
NpySchemaReader::NpySchemaReader(ReadBuffer & in_)
: ISchemaReader(in_) {}
NamesAndTypesList NpySchemaReader::readSchema()
{ {
if (first_row) if (first_row)
{ {
@ -268,7 +475,6 @@ NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof)
if (in.eof()) if (in.eof())
{ {
eof = true;
return {}; return {};
} }
@ -278,90 +484,33 @@ NamesAndTypesList NpySchemaReader::readRowAndGetNamesAndDataTypes(bool & eof)
return {}; return {};
} }
return {}; auto header = parseHeader(in);
std::vector<int> shape = parseShape(header["shape"]);
DataTypePtr nested_type = parseType(header["descr"]);
DataTypePtr result_type = createDataType(shape.size(), nested_type);
return {{"array", result_type}};
} }
size_t nthSubstr(int n, const String& s,
const String& p)
{
String::size_type i = s.find(p); // Find the first occurrence
int j;
for (j = 1; j < n && i != String::npos; ++j)
i = s.find(p, i+1); // Find the next occurrence
if (j == n)
return(i);
else
return(-1);
}
// String NpySchemaReader::readHeader(bool & eof)
// {
// if (first_row)
// {
// skipBOMIfExists(in);
// first_row = false;
// }
// if (in.eof())
// {
// eof = true;
// return {};
// }
// if (*in.position() == '\n')
// {
// ++in.position();
// return {};
// }
// // NamesAndTypesList names_and_types;
// StringRef name_ref;
// String name_buf;
// // readName(in, name_ref, name_buf);
// String text = String(name_ref);
// String res;
// size_t pos = text.find('{');
// std::map<String, String> header;
// if (pos != String::npos) {
// // Find the closing curly brace.
// size_t end = text.find('}', pos + 1);
// if (end != String::npos)
// {
// // Get the text in curly braces.
// res = text.substr(pos + 1, end - pos - 1);
// // Print the text in curly braces.
// }
// header["descr"] = res.substr(nthSubstr(1, res, "'descr':")+10, nthSubstr(1, res, "',")-2);
// }
// return text;
// }
void registerInputFormatNpy(FormatFactory & factory) void registerInputFormatNpy(FormatFactory & factory)
{ {
factory.registerInputFormat("npy", []( factory.registerInputFormat("npy", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings &)
{ {
return std::make_shared<NpyRowInputFormat>(buf, sample, std::move(params), settings); return std::make_shared<NpyRowInputFormat>(buf, sample, std::move(params));
}); });
factory.markFormatSupportsSubsetOfColumns("npy"); factory.markFormatSupportsSubsetOfColumns("npy");
} }
void registerNpySchemaReader(FormatFactory & factory) void registerNpySchemaReader(FormatFactory & factory)
{ {
factory.registerSchemaReader("Npy", [](ReadBuffer & buf, const FormatSettings & settings) factory.registerSchemaReader("Npy", [](ReadBuffer & buf, const FormatSettings &)
{ {
return std::make_shared<NpySchemaReader>(buf, settings); return std::make_shared<NpySchemaReader>(buf);
});
factory.registerAdditionalInfoForSchemaCacheGetter("npy", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::Escaped);
}); });
} }

View File

@ -1,14 +1,16 @@
#pragma once #pragma once
#include <unordered_map> #include <unordered_map>
#include <vector>
#include <Core/Block.h> #include <Core/Block.h>
#include <Processors/Formats/IRowInputFormat.h> #include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h> #include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include "Core/Field.h" #include "Columns/IColumn.h"
#include "Core/NamesAndTypes.h" #include <Core/Field.h>
#include "Core/Types.h" #include <Core/NamesAndTypes.h>
#include <Core/Types.h>
using NpySizeT = uint32_t; using NpySizeT = uint32_t;
static const uint8_t NPY_DOCUMENT_END = 0x00; static const uint8_t NPY_DOCUMENT_END = 0x00;
@ -18,63 +20,40 @@ namespace DB
class ReadBuffer; class ReadBuffer;
/** Stream for reading data in TSKV format.
* TSKV is a very inefficient data format.
* Similar to TSV, but each field is written as key=value.
* Fields can be listed in any order (including, in different lines there may be different order),
* and some fields may be missing.
* An equal sign can be escaped in the field name.
* Also, as an additional element there may be a useless tskv fragment - it needs to be ignored.
*/
class NpyRowInputFormat final : public IRowInputFormat class NpyRowInputFormat final : public IRowInputFormat
{ {
public: public:
NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_); NpyRowInputFormat(ReadBuffer & in_, Block header_, Params params_);
String getName() const override { return "NpyRowInputFormat"; } String getName() const override { return "NpyRowInputFormat"; }
void readFromBuffer(MutableColumns & /*columns*/);
void resetParser() override; void resetParser() override;
private: private:
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override; bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; } void readData(MutableColumns & columns);
void syncAfterError() override;
bool supportsCountRows() const override { return true; } void readRows(MutableColumns & columns);
size_t countRows(size_t max_block_size) override;
const FormatSettings format_settings; void readValueAndinsertIntoColumn(IColumn& column);
/// Buffer for the read from the stream the field name. Used when you have to copy it.
String name_buf;
/// Hash table matching `field name -> position in the block`. NOTE You can use perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
NameMap name_map;
std::unordered_map<String, String> header; std::unordered_map<String, String> header;
DataTypePtr data_type;
std::vector<int> shape; std::vector<int> shape;
DataTypePtr nestedType;
/// Set of columns for which the values were read. The rest will be filled with default values. int endian;
std::vector<UInt8> read_columns;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> seen_columns;
/// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true
/// for row like ..., non-nullable column name=\N, ...
}; };
class NpySchemaReader : public IRowWithNamesSchemaReader class NpySchemaReader : public ISchemaReader
{ {
public: public:
NpySchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_); explicit NpySchemaReader(ReadBuffer & in_);
std::unordered_map<String, String> getHeader(); std::unordered_map<String, String> getHeader();
private: private:
NamesAndTypesList readRowAndGetNamesAndDataTypes([[maybe_unused]]bool & eof) override; NamesAndTypesList readSchema() override;
// NamesAndTypesList getDataTypesFromNpyDocument([[maybe_unused]]bool allow_to_skip_unsupported_types); // NamesAndTypesList getDataTypesFromNpyDocument([[maybe_unused]]bool allow_to_skip_unsupported_types);
// String readHeader(bool & eof); // String readHeader(bool & eof);