Merge pull request #23102 from godliness/read-orc-stripe

Read ORC file by stripe to reduce memory cost
This commit is contained in:
Nikita Mikhaylov 2021-04-19 14:45:40 +03:00 committed by GitHub
commit 6b928c4b6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 10 deletions

View File

@ -19,6 +19,13 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
} }
#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_) ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_)
{ {
} }
@ -28,21 +35,26 @@ Chunk ORCBlockInputFormat::generate()
Chunk res; Chunk res;
const Block & header = getPort().getHeader(); const Block & header = getPort().getHeader();
if (file_reader) if (!file_reader)
prepareReader();
if (stripe_current >= stripe_total)
return res; return res;
arrow::Status open_status = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader); std::shared_ptr<arrow::RecordBatch> batch_result;
if (!open_status.ok()) arrow::Status batch_status = file_reader->ReadStripe(stripe_current, include_indices, &batch_result);
throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS); if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}", batch_status.ToString());
std::shared_ptr<arrow::Table> table; auto table_result = arrow::Table::FromRecordBatches({batch_result});
arrow::Status read_status = file_reader->Read(&table); if (!table_result.ok())
if (!read_status.ok()) throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
throw ParsingException{"Error while reading ORC data: " + read_status.ToString(), "Error while reading batch of ORC data: {}", table_result.status().ToString());
ErrorCodes::CANNOT_READ_ALL_DATA};
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC"); ++stripe_current;
ArrowColumnToCHColumn::arrowTableToCHChunk(res, *table_result, header, "ORC");
return res; return res;
} }
@ -51,6 +63,26 @@ void ORCBlockInputFormat::resetParser()
IInputFormat::resetParser(); IInputFormat::resetParser();
file_reader.reset(); file_reader.reset();
include_indices.clear();
stripe_current = 0;
}
void ORCBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema));
for (int i = 0; i < schema->num_fields(); ++i)
{
if (getPort().getHeader().has(schema->field(i)->name()))
{
include_indices.push_back(i+1);
}
}
} }
void registerInputFormatProcessorORC(FormatFactory &factory) void registerInputFormatProcessorORC(FormatFactory &factory)

View File

@ -25,6 +25,15 @@ private:
// TODO: check that this class implements every part of its parent // TODO: check that this class implements every part of its parent
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader; std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
int stripe_total = 0;
int stripe_current = 0;
// indices of columns to read from ORC file
std::vector<int> include_indices;
void prepareReader();
}; };
} }