mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Arrow input format
This commit is contained in:
parent
f34a6dc482
commit
538e6c39da
@ -70,6 +70,7 @@ elseif(NOT MISSING_INTERNAL_PARQUET_LIBRARY AND NOT OS_FREEBSD)
|
||||
|
||||
set(USE_PARQUET 1)
|
||||
set(USE_ORC 1)
|
||||
set(USE_ARROW 1)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
|
@ -356,6 +356,7 @@ FormatFactory::FormatFactory()
|
||||
registerInputFormatProcessorORC(*this);
|
||||
registerInputFormatProcessorParquet(*this);
|
||||
registerOutputFormatProcessorParquet(*this);
|
||||
registerInputFormatProcessorArrow(*this);
|
||||
registerInputFormatProcessorAvro(*this);
|
||||
registerOutputFormatProcessorAvro(*this);
|
||||
#endif
|
||||
|
@ -166,6 +166,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
|
||||
void registerInputFormatProcessorParquet(FormatFactory & factory);
|
||||
void registerInputFormatProcessorORC(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorParquet(FormatFactory & factory);
|
||||
void registerInputFormatProcessorArrow(FormatFactory & factory);
|
||||
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
||||
void registerInputFormatProcessorAvro(FormatFactory & factory);
|
||||
|
@ -7,4 +7,5 @@
|
||||
#cmakedefine01 USE_SNAPPY
|
||||
#cmakedefine01 USE_PARQUET
|
||||
#cmakedefine01 USE_ORC
|
||||
#cmakedefine01 USE_ARROW
|
||||
#cmakedefine01 USE_PROTOBUF
|
||||
|
113
src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Normal file
113
src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Normal file
@ -0,0 +1,113 @@
|
||||
#include "ArrowBlockInputFormat.h"
|
||||
#if USE_ARROW
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/io/memory.h>
|
||||
#include <arrow/ipc/reader.h>
|
||||
#include <arrow/status.h>
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer &in_, Block header_) : IInputFormat(std::move(header_), in_)
|
||||
{
|
||||
}
|
||||
|
||||
Chunk ArrowBlockInputFormat::generate()
|
||||
{
|
||||
Chunk res;
|
||||
|
||||
const auto & header = getPort().getHeader();
|
||||
|
||||
if (!in.eof())
|
||||
{
|
||||
if (row_group_current < row_group_total)
|
||||
throw Exception{"Got new data, but data from previous chunks was not read " +
|
||||
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
|
||||
file_data.clear();
|
||||
{
|
||||
WriteBufferFromString file_buffer(file_data);
|
||||
copyData(in, file_buffer);
|
||||
}
|
||||
|
||||
std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
|
||||
|
||||
std::shared_ptr<arrow::io::RandomAccessFile> in_stream(new arrow::io::BufferReader(*local_buffer));
|
||||
|
||||
arrow::Status open_status = arrow::ipc::RecordBatchFileReader::Open(in_stream, &file_reader);
|
||||
if (!open_status.ok())
|
||||
return res;
|
||||
|
||||
row_group_total = file_reader->num_record_batches();
|
||||
row_group_current = 0;
|
||||
|
||||
} else
|
||||
return res;
|
||||
|
||||
if (row_group_current >= row_group_total)
|
||||
return res;
|
||||
|
||||
std::vector<std::shared_ptr<arrow::RecordBatch>> singleBatch(1);
|
||||
arrow::Status read_status = file_reader->ReadRecordBatch(row_group_current, &singleBatch[0]);
|
||||
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
arrow::Status make_status = arrow::Table::FromRecordBatches(singleBatch, &table);
|
||||
if (!make_status.ok()) {
|
||||
throw Exception{"Cannot make table from record batch", ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Arrow");
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void ArrowBlockInputFormat::resetParser()
|
||||
{
|
||||
IInputFormat::resetParser();
|
||||
|
||||
file_reader.reset();
|
||||
file_data.clear();
|
||||
row_group_total = 0;
|
||||
row_group_current = 0;
|
||||
}
|
||||
|
||||
void registerInputFormatProcessorArrow(FormatFactory &factory)
|
||||
{
|
||||
factory.registerInputFormatProcessor(
|
||||
"Arrow",
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams & /* params */,
|
||||
const FormatSettings & /* settings */)
|
||||
{
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerInputFormatProcessorArrow(FormatFactory &)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
35
src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Normal file
35
src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Normal file
@ -0,0 +1,35 @@
|
||||
#pragma once
|
||||
|
||||
#include "config_formats.h"
|
||||
#if USE_ARROW
|
||||
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
|
||||
namespace arrow::ipc { class RecordBatchFileReader; }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Context;
|
||||
|
||||
class ArrowBlockInputFormat: public IInputFormat
|
||||
{
|
||||
public:
|
||||
ArrowBlockInputFormat(ReadBuffer & in_, Block header_);
|
||||
|
||||
void resetParser() override;
|
||||
|
||||
String getName() const override { return "ArrowBlockInputFormat"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
|
||||
std::string file_data;
|
||||
int row_group_total = 0;
|
||||
int row_group_current = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,7 +1,7 @@
|
||||
#include "config_formats.h"
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
|
||||
#if USE_ORC || USE_PARQUET
|
||||
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include "config_formats.h"
|
||||
|
||||
#if USE_ORC || USE_PARQUET
|
||||
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
Loading…
Reference in New Issue
Block a user