Simplify ORC format

This commit is contained in:
FawnD2 2020-05-04 02:19:56 +03:00
parent 112758b99d
commit a554177724
2 changed files with 19 additions and 43 deletions

View File

@ -2,11 +2,11 @@
#if USE_ORC
#include <Formats/FormatFactory.h>
#include <IO/BufferBase.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/adapters/orc/adapter.h>
#include <arrow/io/memory.h>
#include "ArrowColumnToCHColumn.h"
@ -15,10 +15,10 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_READ_ALL_DATA;
}
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_)
{
}
@ -27,48 +27,30 @@ Chunk ORCBlockInputFormat::generate()
{
Chunk res;
const auto & header = getPort().getHeader();
if (in.eof())
return res;
if (!in.eof())
file_data.clear();
{
if (row_group_current < row_group_total)
throw Exception{"Got new data, but data from previous chunks was not read " +
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
ErrorCodes::CANNOT_READ_ALL_DATA};
file_data.clear();
{
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer);
}
std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
std::shared_ptr<arrow::io::RandomAccessFile> in_stream = std::make_shared<arrow::io::BufferReader>(*local_buffer);
bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(),
&file_reader).ok();
if (!ok)
return res;
row_group_total = file_reader->NumberOfRows();
row_group_current = 0;
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer);
}
else
return res;
if (row_group_current >= row_group_total)
return res;
std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
std::shared_ptr<arrow::io::RandomAccessFile> in_stream = std::make_shared<arrow::io::BufferReader>(*local_buffer);
arrow::Status open_status = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(), &file_reader);
if (!open_status.ok())
throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS);
std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->Read(&table);
if (!read_status.ok())
throw Exception{"Error while reading ORC data: " + read_status.ToString(),
ErrorCodes::CANNOT_READ_ALL_DATA};
++row_group_current;
const Block & header = getPort().getHeader();
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC");
@ -81,8 +63,6 @@ void ORCBlockInputFormat::resetParser()
file_reader.reset();
file_data.clear();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorORC(FormatFactory &factory)
@ -93,7 +73,7 @@ void registerInputFormatProcessorORC(FormatFactory &factory)
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & /* settings */)
{
{
return std::make_shared<ORCBlockInputFormat>(buf, sample);
});
}

View File

@ -1,14 +1,12 @@
#pragma once
#include "config_formats.h"
#if USE_ORC
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Chunk.h>
#include <Processors/Formats/IInputFormat.h>
#if USE_ORC
#include "arrow/adapters/orc/adapter.h"
#include "arrow/io/interfaces.h"
namespace arrow::adapters::orc { class ORCFileReader; }
namespace DB
{
@ -32,8 +30,6 @@ private:
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::string file_data;
int row_group_total = 0;
int row_group_current = 0;
};
}