ClickHouse/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp

101 lines
2.9 KiB
C++
Raw Normal View History

2020-05-03 02:46:36 +00:00
#include "ArrowBlockOutputFormat.h"
#if USE_ARROW
#include <Formats/FormatFactory.h>
#include <arrow/ipc/writer.h>
2020-05-03 18:12:14 +00:00
#include <arrow/table.h>
#include "ArrowBufferedStreams.h"
2020-05-03 02:46:36 +00:00
#include "CHColumnToArrowColumn.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), stream{stream_}, format_settings{format_settings_}, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
2020-05-03 02:46:36 +00:00
{
}
void ArrowBlockOutputFormat::consume(Chunk chunk)
{
const Block & header = getPort(PortKind::Main).getHeader();
const size_t columns_num = chunk.getNumColumns();
std::shared_ptr<arrow::Table> arrow_table;
2020-05-03 12:25:53 +00:00
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Arrow");
2020-05-03 02:46:36 +00:00
if (!writer)
prepareWriter(arrow_table->schema());
2020-05-03 02:46:36 +00:00
// TODO: calculate row_group_size depending on a number of rows and table size
2020-05-03 12:26:39 +00:00
auto status = writer->WriteTable(*arrow_table, format_settings.arrow.row_group_size);
2020-05-03 02:46:36 +00:00
if (!status.ok())
throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void ArrowBlockOutputFormat::finalize()
{
if (writer)
{
auto status = writer->Close();
if (!status.ok())
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
}
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
arrow::Status status;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
if (stream)
status = arrow::ipc::RecordBatchStreamWriter::Open(arrow_ostream.get(), schema, &writer);
else
status = arrow::ipc::RecordBatchFileWriter::Open(arrow_ostream.get(), schema, &writer);
if (!status.ok())
throw Exception{"Error while opening a table writer: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
2020-05-03 02:46:36 +00:00
void registerOutputFormatProcessorArrow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
});
factory.registerOutputFormatProcessor(
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
2020-05-03 02:46:36 +00:00
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProcessorArrow(FormatFactory &)
{
}
}
#endif