diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 9af03e93c32..6f43addc4ed 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -19,6 +19,13 @@ namespace ErrorCodes extern const int CANNOT_READ_ALL_DATA; } +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \ + } while (false) + ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_) { } @@ -28,21 +35,26 @@ Chunk ORCBlockInputFormat::generate() Chunk res; const Block & header = getPort().getHeader(); - if (file_reader) + if (!file_reader) + prepareReader(); + + if (stripe_current >= stripe_total) return res; - arrow::Status open_status = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader); - if (!open_status.ok()) - throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS); + std::shared_ptr batch_result; + arrow::Status batch_status = file_reader->ReadStripe(stripe_current, include_indices, &batch_result); + if (!batch_status.ok()) + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, + "Error while reading batch of ORC data: {}", batch_status.ToString()); - std::shared_ptr table; - arrow::Status read_status = file_reader->Read(&table); - if (!read_status.ok()) - throw ParsingException{"Error while reading ORC data: " + read_status.ToString(), - ErrorCodes::CANNOT_READ_ALL_DATA}; + auto table_result = arrow::Table::FromRecordBatches({batch_result}); + if (!table_result.ok()) + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, + "Error while reading batch of ORC data: {}", table_result.status().ToString()); - ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC"); + ++stripe_current; + ArrowColumnToCHColumn::arrowTableToCHChunk(res, *table_result, header, "ORC"); return res; } @@ -51,6 +63,26 @@ void ORCBlockInputFormat::resetParser() IInputFormat::resetParser(); file_reader.reset(); + include_indices.clear(); + stripe_current = 0; +} + +void ORCBlockInputFormat::prepareReader() +{ + THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader)); + stripe_total = file_reader->NumberOfStripes(); + stripe_current = 0; + + std::shared_ptr schema; + THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema)); + + for (int i = 0; i < schema->num_fields(); ++i) + { + if (getPort().getHeader().has(schema->field(i)->name())) + { + include_indices.push_back(i+1); + } + } } void registerInputFormatProcessorORC(FormatFactory &factory) diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.h b/src/Processors/Formats/Impl/ORCBlockInputFormat.h index cff42560366..0c78290f3cc 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.h @@ -25,6 +25,15 @@ private: // TODO: check that this class implements every part of its parent std::unique_ptr file_reader; + + int stripe_total = 0; + + int stripe_current = 0; + + // indices of columns to read from ORC file + std::vector include_indices; + + void prepareReader(); }; }