From f8dae2bc76750daa268e5f0d49a8046f6a7ab1f7 Mon Sep 17 00:00:00 2001 From: Ivan Zhukov Date: Tue, 15 May 2018 18:30:46 +0300 Subject: [PATCH] Parquet input: get rid of redundant copying --- .../DataStreams/ParquetBlockInputStream.cpp | 134 ++++++++++++------ .../src/DataStreams/ParquetBlockInputStream.h | 17 ++- 2 files changed, 102 insertions(+), 49 deletions(-) diff --git a/dbms/src/DataStreams/ParquetBlockInputStream.cpp b/dbms/src/DataStreams/ParquetBlockInputStream.cpp index 0be7a55715b..c5bc17ccf14 100644 --- a/dbms/src/DataStreams/ParquetBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParquetBlockInputStream.cpp @@ -4,6 +4,9 @@ // TODO: clear includes #include +#include +#include +#include #include #include #include @@ -36,16 +39,74 @@ Block ParquetBlockInputStream::getHeader() const return header; } - -void ParquetBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) +/// Inserts numeric data right into internal column data to reduce an overhead +template +void ParquetBlockInputStream::fillColumnWithNumericData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) { - IDataType::InputStreamGetter input_stream_getter = [&] (const IDataType::SubstreamPath &) { return &istr; }; - type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, /* avg_value_size_hint = */0, false, {}); + PaddedPODArray & column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); - if (column.size() != rows) - throw Exception("Cannot read all data in ParquetBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = chunk->data()->buffers[1]; + + const NumericType * raw_data = reinterpret_cast(buffer->data()); + column_data.insert_assume_reserved(raw_data, raw_data + chunk->length()); + } } +/// Inserts chars and offsets right into internal column data to reduce an overhead. +/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars. +/// Also internal strings are null terminated. +void ParquetBlockInputStream::fillColumnWithStringData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) +{ + PaddedPODArray & column_chars_t = static_cast(*internal_column).getChars(); + PaddedPODArray & column_offsets = static_cast(*internal_column).getOffsets(); + + size_t chars_t_size = 0; + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + const size_t chunk_length = chunk.length(); + + chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1); + chars_t_size += chunk_length; /// additional space for null bytes + } + + column_chars_t.reserve(chars_t_size); + column_offsets.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + std::shared_ptr buffer = chunk.value_data(); + const size_t chunk_length = chunk.length(); + + for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + { + const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i); + column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); + column_chars_t.emplace_back('\0'); + + column_offsets.emplace_back(column_chars_t.size()); + } + } +} + +#define FOR_ARROW_NUMERIC_TYPES(M) \ + M(arrow::Type::UINT8, UInt8) \ + M(arrow::Type::INT8, Int8) \ + M(arrow::Type::UINT16, UInt16) \ + M(arrow::Type::INT16, Int16) \ + M(arrow::Type::UINT32, UInt32) \ + M(arrow::Type::INT32, Int32) \ + M(arrow::Type::UINT64, UInt64) \ + M(arrow::Type::INT64, Int64) \ + M(arrow::Type::FLOAT, Float32) \ + M(arrow::Type::DOUBLE, Float64) + using NameToColumnPtr = std::unordered_map>; Block ParquetBlockInputStream::readImpl() @@ -87,9 +148,9 @@ Block ParquetBlockInputStream::readImpl() name_to_column_ptr[arrow_column->name()] = arrow_column; } - for (size_t i = 0; i != header.columns(); ++i) + for (size_t column_i = 0; column_i != header.columns(); ++column_i) { - ColumnWithTypeAndName header_column = header.getByPosition(i); + ColumnWithTypeAndName header_column = header.getByPosition(column_i); if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end()) // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? @@ -102,21 +163,20 @@ Block ParquetBlockInputStream::readImpl() std::shared_ptr arrow_column = name_to_column_ptr[header_column.name]; arrow::Type::type arrow_type = arrow_column->type()->id(); - if (arrow_type_to_native_type.find(arrow_type) == arrow_type_to_native_type.end()) + if (arrow_type_to_internal_type.find(arrow_type) == arrow_type_to_internal_type.end()) { throw Exception("Unsupported type " + arrow_column->type()->name() + " of a column " + arrow_column->name()/*, ErrorCodes::TODO*/); } - - DataTypePtr native_type = arrow_type_to_native_type[arrow_type]; - if (header_column.type->getName() != native_type->getName()) + DataTypePtr internal_type = arrow_type_to_internal_type[arrow_type]; + if (header_column.type->getName() != internal_type->getName()) { - throw Exception("Input data type " + native_type->getName() + " for column \"" + header_column.name + "\" is not compatible with an actual type " + header_column.type->getName()); + throw Exception("Input data type " + internal_type->getName() + " for column \"" + header_column.name + "\" is not compatible with an actual type " + header_column.type->getName()); } ColumnWithTypeAndName column; column.name = header_column.name; - column.type = native_type; + column.type = internal_type; /// Data MutableColumnPtr read_column = column.type->createColumn(); @@ -125,38 +185,26 @@ Block ParquetBlockInputStream::readImpl() if (arrow::Type::STRING == arrow_type) { - for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) - { - arrow::StringArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - - /// buffers[0] is a null bitmap and buffers[1] are actual values - // TODO: need to recalculate the size if strings are null-terminated - size_t buf_sz = chunk.data()->buffers[1]->size() + (chunk.length() * sizeof(size_t)); - // TODO: naming - std::vector data(buf_sz); - WriteBuffer wb(data.data(), data.size()); - - for (size_t string_i = 0; string_i != static_cast(chunk.length()); ++string_i) - writeStringBinary(chunk.GetString(string_i), wb); - - ReadBufferFromMemory values_buffer(data.data(), data.size()); - size_t rows_num = chunk.length(); - readData(*column.type, *read_column, values_buffer, rows_num); - } + fillColumnWithStringData(arrow_column, read_column); } + // TODO: check that values smaller than INT32 are being read correctly +#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + else if (ARROW_NUMERIC_TYPE == arrow_type) \ + { \ + fillColumnWithNumericData(arrow_column, read_column); \ + } + + FOR_ARROW_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + + // TODO: arrow::Type::BOOLEAN + // TODO: arrow::Type::DATE32 + // TODO: arrow::Type::DATE64 + + // TODO: add other types else { - for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) - { - std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); - /// buffers[0] is a null bitmap and buffers[1] are actual values - std::shared_ptr buffer = chunk->data()->buffers[1]; - // TODO: make less copying? - ReadBufferFromMemory values_buffer(buffer->data(), buffer->size()); - size_t rows_num = chunk->length(); - - readData(*column.type, *read_column, values_buffer, rows_num); - } + throw Exception("Unsupported parquet type " + arrow_column->type()->name()/*, ErrorCodes::TODO*/); } column.column = std::move(read_column); diff --git a/dbms/src/DataStreams/ParquetBlockInputStream.h b/dbms/src/DataStreams/ParquetBlockInputStream.h index 0ab4d5f4ec3..e0bc42f5640 100644 --- a/dbms/src/DataStreams/ParquetBlockInputStream.h +++ b/dbms/src/DataStreams/ParquetBlockInputStream.h @@ -1,5 +1,7 @@ #pragma once +#include +#include #include #include #include @@ -20,8 +22,6 @@ public: String getName() const override { return "Parquet"; } Block getHeader() const override; - static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows); - protected: Block readImpl() override; @@ -29,8 +29,12 @@ private: ReadBuffer & istr; Block header; - std::unordered_map> arrow_type_to_native_type = { - {arrow::Type::BOOL, std::make_shared()}, + template + void fillColumnWithNumericData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + + void fillColumnWithStringData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + + std::unordered_map> arrow_type_to_internal_type = { {arrow::Type::UINT8, std::make_shared()}, {arrow::Type::INT8, std::make_shared()}, {arrow::Type::UINT16, std::make_shared()}, @@ -41,9 +45,10 @@ private: {arrow::Type::INT64, std::make_shared()}, {arrow::Type::FLOAT, std::make_shared()}, {arrow::Type::DOUBLE, std::make_shared()}, - // TODO: + {arrow::Type::STRING, std::make_shared()}//, - /* {arrow::Type::BINARY, Binary, ByteArrayType}, */ + // TODO: + /* {arrow::Type::BOOL, std::make_shared()}, */ /* {arrow::Type::DATE32, Date32, Int32Type}, */ /* {arrow::Type::DATE64, Date64, Int32Type}//, */ // TODO: add other types