Parquet input: support a string type

This commit is contained in:
Ivan Zhukov 2018-05-14 01:34:27 +03:00
parent e5307e3c18
commit 282110cba4
2 changed files with 37 additions and 14 deletions

View File

@ -2,11 +2,13 @@
#include <iterator>
#include <vector>
// TODO: clear includes
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/ParquetBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/BufferBase.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <ext/range.h>
#include <arrow/buffer.h>
@ -106,8 +108,6 @@ Block ParquetBlockInputStream::readImpl()
}
// TODO: support NULL values
DataTypePtr native_type = arrow_type_to_native_type[arrow_type];
if (header_column.type->getName() != native_type->getName())
{
@ -121,20 +121,43 @@ Block ParquetBlockInputStream::readImpl()
/// Data
MutableColumnPtr read_column = column.type->createColumn();
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
// TODO: support NULL values
if (arrow::Type::STRING == arrow_type)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
/// arrow::Array has two buffers: null bitmap and actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1]; /// getting values
// TODO: make less copying?
ReadBufferFromMemory values_buffer(buffer->data(), buffer->size());
size_t rows_num = chunk->length();
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::StringArray & chunk = static_cast<arrow::StringArray &>(*(arrow_column->data()->chunk(chunk_i)));
readData(*column.type, *read_column, values_buffer, rows_num);
/// buffers[0] is a null bitmap and buffers[1] are actual values
// TODO: need to recalculate the size if strings are null-terminated
size_t buf_sz = chunk.data()->buffers[1]->size() + (chunk.length() * sizeof(size_t));
// TODO: naming
std::vector<char> data(buf_sz);
WriteBuffer wb(data.data(), data.size());
for (size_t string_i = 0; string_i != static_cast<size_t>(chunk.length()); ++string_i)
writeStringBinary(chunk.GetString(string_i), wb);
ReadBufferFromMemory values_buffer(data.data(), data.size());
size_t rows_num = chunk.length();
readData(*column.type, *read_column, values_buffer, rows_num);
}
}
else
{
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
// TODO: make less copying?
ReadBufferFromMemory values_buffer(buffer->data(), buffer->size());
size_t rows_num = chunk->length();
// TODO: process a String type
// TODO: if (... == "String") { ... }
readData(*column.type, *read_column, values_buffer, rows_num);
}
}
column.column = std::move(read_column);
res.insert(std::move(column));

View File

@ -40,9 +40,9 @@ private:
{arrow::Type::UINT64, std::make_shared<DataTypeUInt64>()},
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()}//,
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
// TODO:
/* {arrow::Type::STRING, std::make_shared<DataTypeString>()}//, */
{arrow::Type::STRING, std::make_shared<DataTypeString>()}//,
/* {arrow::Type::BINARY, Binary, ByteArrayType}, */
/* {arrow::Type::DATE32, Date32, Int32Type}, */
/* {arrow::Type::DATE64, Date64, Int32Type}//, */