ClickHouse/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp

151 lines
4.1 KiB
C++
Raw Normal View History

2020-04-28 19:52:22 +00:00
#include "ArrowBlockInputFormat.h"
2021-06-02 08:51:07 +00:00
2020-04-28 19:52:22 +00:00
#if USE_ARROW
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/api.h>
#include <arrow/ipc/reader.h>
#include <arrow/result.h>
2020-05-05 12:56:54 +00:00
#include "ArrowBufferedStreams.h"
2020-04-28 19:52:22 +00:00
#include "ArrowColumnToCHColumn.h"
2020-07-13 01:11:35 +00:00
2020-04-28 19:52:22 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
2020-04-28 19:52:22 +00:00
extern const int CANNOT_READ_ALL_DATA;
}
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IInputFormat(header_, in_), stream{stream_}, format_settings(format_settings_)
2020-04-28 19:52:22 +00:00
{
}
Chunk ArrowBlockInputFormat::generate()
{
Chunk res;
arrow::Result<std::shared_ptr<arrow::RecordBatch>> batch_result;
if (stream)
{
if (!stream_reader)
prepareReader();
batch_result = stream_reader->Next();
if (batch_result.ok() && !(*batch_result))
2020-05-21 07:28:10 +00:00
return res;
}
else
{
if (!file_reader)
prepareReader();
2020-05-27 12:50:12 +00:00
if (record_batch_current >= record_batch_total)
return res;
batch_result = file_reader->ReadRecordBatch(record_batch_current);
}
if (!batch_result.ok())
2020-12-10 17:26:36 +00:00
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of Arrow data: {}", batch_result.status().ToString());
2020-05-03 19:49:06 +00:00
auto table_result = arrow::Table::FromRecordBatches({*batch_result});
if (!table_result.ok())
2020-12-10 17:26:36 +00:00
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of Arrow data: {}", table_result.status().ToString());
2020-05-03 18:12:14 +00:00
2020-05-04 14:28:36 +00:00
++record_batch_current;
2021-06-07 15:15:58 +00:00
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
2020-04-28 19:52:22 +00:00
return res;
}
void ArrowBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
if (stream)
stream_reader.reset();
else
file_reader.reset();
record_batch_current = 0;
}
void ArrowBlockInputFormat::prepareReader()
{
2021-06-07 15:15:58 +00:00
std::shared_ptr<arrow::Schema> schema;
if (stream)
{
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(*in));
if (!stream_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", stream_reader_status.status().ToString());
stream_reader = *stream_reader_status;
2021-06-07 15:15:58 +00:00
schema = stream_reader->schema();
}
else
{
2021-10-31 19:53:24 +00:00
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in, format_settings));
if (!file_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", file_reader_status.status().ToString());
file_reader = *file_reader_status;
2021-06-07 15:15:58 +00:00
schema = file_reader->schema();
}
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(), "Arrow", format_settings.arrow.import_nested, format_settings.defaults_for_omitted_fields);
2021-06-07 15:15:58 +00:00
if (stream)
record_batch_total = -1;
else
record_batch_total = file_reader->num_record_batches();
2020-05-04 14:28:36 +00:00
record_batch_current = 0;
2020-04-28 19:52:22 +00:00
}
2021-10-11 16:11:50 +00:00
void registerInputFormatArrow(FormatFactory & factory)
2020-04-28 19:52:22 +00:00
{
2021-10-11 16:11:50 +00:00
factory.registerInputFormat(
"Arrow",
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & /* params */,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
});
2021-03-30 21:25:37 +00:00
factory.markFormatAsColumnOriented("Arrow");
2021-10-11 16:11:50 +00:00
factory.registerInputFormat(
"ArrowStream",
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & /* params */,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, true, format_settings);
});
2020-04-28 19:52:22 +00:00
}
}
#else
namespace DB
{
class FormatFactory;
2021-10-11 16:11:50 +00:00
void registerInputFormatArrow(FormatFactory &)
2020-04-28 19:52:22 +00:00
{
}
}
2020-05-02 19:32:21 +00:00
#endif