#include "ORCBlockInputFormat.h" #if USE_ORC #include #include #include #include #include #include #include #include "ArrowColumnToCHColumn.h" namespace DB { namespace ErrorCodes { extern const int CANNOT_READ_ALL_DATA; } ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer &in_, Block header_) : IInputFormat(std::move(header_), in_) { } Chunk ORCBlockInputFormat::generate() { Chunk res; auto &header = getPort().getHeader(); if (!in.eof()) { if (row_group_current < row_group_total) throw Exception{"Got new data, but data from previous chunks was not read " + std::to_string(row_group_current) + "/" + std::to_string(row_group_total), ErrorCodes::CANNOT_READ_ALL_DATA}; file_data.clear(); { WriteBufferFromString file_buffer(file_data); copyData(in, file_buffer); } std::unique_ptr local_buffer = std::make_unique(file_data); std::shared_ptr in_stream(new arrow::io::BufferReader(*local_buffer)); bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(), &file_reader).ok(); if (!ok) return res; row_group_total = file_reader->NumberOfRows(); row_group_current = 0; } else return res; if (row_group_current >= row_group_total) return res; std::shared_ptr table; arrow::Status read_status = file_reader->Read(&table); ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "ORC"); return res; } void ORCBlockInputFormat::resetParser() { IInputFormat::resetParser(); file_reader.reset(); file_data.clear(); row_group_total = 0; row_group_current = 0; } void registerInputFormatProcessorORC(FormatFactory &factory) { factory.registerInputFormatProcessor( "ORC", [](ReadBuffer &buf, const Block &sample, const RowInputFormatParams &, const FormatSettings & /* settings */) { return std::make_shared(buf, sample); }); } } #else namespace DB { namespace ErrorCodes { extern const int CANNOT_READ_ALL_DATA; } class FormatFactory; void registerInputFormatProcessorORC(FormatFactory &) { } } #endif