Arrow output format

This commit is contained in:
FawnD2 2020-05-03 05:46:36 +03:00
parent aeed62d5e7
commit 6e0b31fb2a
8 changed files with 189 additions and 45 deletions

View File

@ -357,6 +357,7 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorParquet(*this);
registerOutputFormatProcessorParquet(*this);
registerInputFormatProcessorArrow(*this);
registerOutputFormatProcessorArrow(*this);
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
#endif

View File

@ -167,6 +167,7 @@ void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);

View File

@ -0,0 +1,81 @@
#include "ArrowBlockOutputFormat.h"
#if USE_ARROW
#include <Formats/FormatFactory.h>
#include <arrow/table.h>
#include <arrow/ipc/writer.h>
#include "ArrowBufferedOutputStream.h"
#include "CHColumnToArrowColumn.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), format_settings{format_settings_}, arrow_ostream{out_}
{
}
void ArrowBlockOutputFormat::consume(Chunk chunk)
{
const Block & header = getPort(PortKind::Main).getHeader();
const size_t columns_num = chunk.getNumColumns();
std::shared_ptr<arrow::Table> arrow_table;
CHColumnToArrowColumn::CHChunkToArrowTable(arrow_table, header, chunk, columns_num, "Arrow");
if (!writer)
{
// TODO: should we use arrow::ipc::IpcOptions::alignment?
auto status = arrow::ipc::RecordBatchFileWriter::Open(&arrow_ostream, arrow_table->schema(), &writer);
if (!status.ok())
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
// TODO: calculate row_group_size depending on a number of rows and table size
auto status = writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size);
if (!status.ok())
throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void ArrowBlockOutputFormat::finalize()
{
if (writer)
{
auto status = writer->Close();
if (!status.ok())
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
}
void registerOutputFormatProcessorArrow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, format_settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProcessorArrow(FormatFactory &)
{
}
}
#endif

View File

@ -0,0 +1,34 @@
#pragma once
#include "config_formats.h"
#if USE_ARROW
#include <Core/Types.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IOutputFormat.h>
#include "ArrowBufferedOutputStream.h"
namespace arrow::ipc { class RecordBatchWriter; }
namespace DB
{
class ArrowBlockOutputFormat : public IOutputFormat
{
public:
ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "ArrowBlockOutputFormat"; }
void consume(Chunk) override;
void finalize() override;
String getContentType() const override { return "application/octet-stream"; }
private:
const FormatSettings format_settings;
ArrowBufferedOutputStream arrow_ostream;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
};
}
#endif

View File

@ -0,0 +1,33 @@
#include "ArrowBufferedOutputStream.h"
#if USE_PARQUET || USE_ARROW
#include <arrow/status.h>
namespace DB
{
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & ostr_) : ostr(ostr_) { is_open = true; }
::arrow::Status ArrowBufferedOutputStream::Close()
{
is_open = false;
return ::arrow::Status::OK();
}
::arrow::Status ArrowBufferedOutputStream::Tell(int64_t * position) const
{
*position = total_length;
return ::arrow::Status::OK();
}
::arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length)
{
ostr.write(reinterpret_cast<const char *>(data), length);
total_length += length;
return ::arrow::Status::OK();
}
}
#endif

View File

@ -0,0 +1,35 @@
#pragma once
#include "config_formats.h"
#if USE_PARQUET || USE_ARROW
#include <IO/WriteBuffer.h>
#include <arrow/io/interfaces.h>
namespace DB
{
class ArrowBufferedOutputStream : public arrow::io::OutputStream
{
public:
explicit ArrowBufferedOutputStream(WriteBuffer & ostr_);
/// FileInterface
::arrow::Status Close() override;
::arrow::Status Tell(int64_t * position) const override;
bool closed() const override { return !is_open; }
/// Writable
::arrow::Status Write(const void * data, int64_t length) override;
private:
WriteBuffer & ostr;
int64_t total_length = 0;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowBufferedOutputStream);
};
}
#endif

View File

@ -51,7 +51,7 @@ namespace DB
return &null_bytemap;
}
static void checkStatus(arrow::Status & status, const String & column_name, const String & format_name)
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
{
if (!status.ok())
throw Exception{"Error with a " + format_name + " column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};

View File

@ -11,10 +11,10 @@
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
#include <arrow/api.h>
#include <arrow/util/decimal.h>
#include <arrow/util/memory.h>
#include <parquet/arrow/writer.h>
#include <parquet/deprecated_io.h>
#include "ArrowBufferedOutputStream.h"
#include "CHColumnToArrowColumn.h"
@ -30,44 +30,6 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo
{
}
class OstreamOutputStream : public arrow::io::OutputStream
{
public:
explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) { is_open = true; }
~OstreamOutputStream() override = default;
// FileInterface
::arrow::Status Close() override
{
is_open = false;
return ::arrow::Status::OK();
}
::arrow::Status Tell(int64_t* position) const override
{
*position = total_length;
return ::arrow::Status::OK();
}
bool closed() const override { return !is_open; }
// Writable
::arrow::Status Write(const void* data, int64_t length) override
{
ostr.write(reinterpret_cast<const char *>(data), length);
total_length += length;
return ::arrow::Status::OK();
}
private:
WriteBuffer & ostr;
int64_t total_length = 0;
bool is_open = false;
PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
};
void ParquetBlockOutputFormat::consume(Chunk chunk)
{
const Block & header = getPort(PortKind::Main).getHeader();
@ -76,10 +38,10 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
CHColumnToArrowColumn::CHChunkToArrowTable(arrow_table, header, chunk, columns_num, "Parquet");
auto sink = std::make_shared<OstreamOutputStream>(out);
if (!file_writer)
{
auto sink = std::make_shared<ArrowBufferedOutputStream>(out);
parquet::WriterProperties::Builder builder;
#if USE_SNAPPY
builder.compression(parquet::Compression::SNAPPY);
@ -112,7 +74,6 @@ void ParquetBlockOutputFormat::finalize()
}
}
void registerOutputFormatProcessorParquet(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
@ -132,7 +93,6 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
}
#else
namespace DB
@ -143,5 +103,4 @@ void registerOutputFormatProcessorParquet(FormatFactory &)
}
}
#endif