From 0111969eaabb6384e485c83f95c600d5fcc1cb7b Mon Sep 17 00:00:00 2001 From: Ivan Zhukov Date: Fri, 25 May 2018 00:33:12 +0300 Subject: [PATCH] Check arrow status --- .../DataStreams/ParquetBlockInputStream.cpp | 16 ++--- .../DataStreams/ParquetBlockOutputStream.cpp | 69 +++++++++++++++---- 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/dbms/src/DataStreams/ParquetBlockInputStream.cpp b/dbms/src/DataStreams/ParquetBlockInputStream.cpp index fd837c00990..34fcf5c635a 100644 --- a/dbms/src/DataStreams/ParquetBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParquetBlockInputStream.cpp @@ -175,10 +175,12 @@ Block ParquetBlockInputStream::readImpl() // TODO: maybe use parquet::RandomAccessSource? auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); parquet::arrow::FileReader filereader(::arrow::default_memory_pool(), std::move(reader)); - std::shared_ptr table; - // TODO: Use an internal Exception? - PARQUET_THROW_NOT_OK(filereader.ReadTable(&table)); + + // TODO: also catch a ParquetException? + arrow::Status read_status = filereader.ReadTable(&table); + if (!read_status.ok()) + throw Exception("Error while reading parquet data: " + read_status.ToString()/*, ErrorCodes::TODO*/); if (0 == table->num_rows()) throw Exception("Empty table in input data"/*, ErrorCodes::TODO*/); @@ -241,8 +243,6 @@ Block ParquetBlockInputStream::readImpl() case arrow::Type::DATE32: fillColumnWithDate32Data(arrow_column, read_column); break; - /* fillColumnWithNumericData(arrow_column, read_column); */ - // TODO: check that values smaller than INT32 are being read correctly #define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ case ARROW_NUMERIC_TYPE: \ fillColumnWithNumericData(arrow_column, read_column); \ @@ -250,10 +250,8 @@ Block ParquetBlockInputStream::readImpl() FOR_ARROW_NUMERIC_TYPES(DISPATCH) #undef DISPATCH - // TODO: arrow::Type::DATE32 - // TODO: arrow::Type::DATE64 - - // TODO: add other types + // TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds? + // TODO: read JSON as a string? default: throw Exception("Unsupported parquet type " + arrow_column->type()->name()/*, ErrorCodes::TODO*/); diff --git a/dbms/src/DataStreams/ParquetBlockOutputStream.cpp b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp index 3d5820e1c4f..c4d6540997b 100644 --- a/dbms/src/DataStreams/ParquetBlockOutputStream.cpp +++ b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp @@ -24,11 +24,25 @@ template void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) { const PaddedPODArray & internal_data = static_cast &>(*write_column).getData(); - ArrowBuilderType numeric_builder; - // TODO: check status.ok() - /*arrow::Status status = */numeric_builder.AppendValues(internal_data.data(), internal_data.size()); - /*arrow::Status status = */numeric_builder.Finish(&arrow_array); + + arrow::Status append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size()); + if (!append_status.ok()) + { + throw Exception( + "Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } + + arrow::Status finish_status = numeric_builder.Finish(&arrow_array); + if (!finish_status.ok()) + { + throw Exception( + "Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } } void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) @@ -39,12 +53,25 @@ void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr writ for (size_t string_i = 0; string_i != internal_column.size(); ++string_i) { StringRef && string_ref = internal_column.getDataAt(string_i); - // TODO: check status.ok() - /*arrow::Status status = */string_builder.Append(string_ref.data, string_ref.size); + + arrow::Status append_status = string_builder.Append(string_ref.data, string_ref.size); + if (!append_status.ok()) + { + throw Exception( + "Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } } - // TODO: check status.ok() - /*arrow::Status status = */string_builder.Finish(&arrow_array); + arrow::Status finish_status = string_builder.Finish(&arrow_array); + if (!finish_status.ok()) + { + throw Exception( + "Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } } void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) @@ -54,12 +81,25 @@ void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_ for (size_t value_i = 0; value_i != internal_data.size(); ++value_i) { - // TODO: check status.ok() /// Implicitly converts UInt16 to Int32 - /*arrow::Status status = */date32_builder.Append(internal_data[value_i]); + arrow::Status append_status = date32_builder.Append(internal_data[value_i]); + if (!append_status.ok()) + { + throw Exception( + "Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } } - /*arrow::Status status = */date32_builder.Finish(&arrow_array); + arrow::Status finish_status = date32_builder.Finish(&arrow_array); + if (!finish_status.ok()) + { + throw Exception( + "Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } } #define FOR_INTERNAL_NUMERIC_TYPES(M) \ @@ -138,14 +178,15 @@ void ParquetBlockOutputStream::write(const Block & block) // TODO: calculate row_group_size /* const UInt64 row_group_size = std::min(1, GiB_in_bytes / sizeof(UInt64) / arrow_table->num_rows()); */ - // TODO: check Status.ok() - arrow::Status status = parquet::arrow::WriteTable( + arrow::Status write_status = parquet::arrow::WriteTable( *arrow_table, arrow::default_memory_pool(), sink, /* row_group_size = */arrow_table->num_rows(), parquet::default_writer_properties(), parquet::arrow::default_arrow_writer_properties() ); - std::shared_ptr table_buffer = sink->GetBuffer(); + if (!write_status.ok()) + throw Exception("Error while writing a table: " + write_status.ToString()/*, ErrorCodes::TODO*/); + std::shared_ptr table_buffer = sink->GetBuffer(); writeString(reinterpret_cast(table_buffer->data()), table_buffer->size(), ostr); }