2020-05-03 02:46:36 +00:00
|
|
|
#include "ArrowBlockOutputFormat.h"
|
|
|
|
|
|
|
|
#if USE_ARROW
|
|
|
|
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <arrow/ipc/writer.h>
|
2020-05-03 18:12:14 +00:00
|
|
|
#include <arrow/table.h>
|
2020-07-13 18:25:49 +00:00
|
|
|
#include <arrow/result.h>
|
2020-05-03 18:12:14 +00:00
|
|
|
#include "ArrowBufferedStreams.h"
|
2020-05-03 02:46:36 +00:00
|
|
|
#include "CHColumnToArrowColumn.h"
|
|
|
|
|
2020-07-13 01:11:35 +00:00
|
|
|
|
2020-05-03 02:46:36 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNKNOWN_EXCEPTION;
|
|
|
|
}
|
|
|
|
|
2020-05-21 04:07:47 +00:00
|
|
|
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
|
2021-06-02 08:51:07 +00:00
|
|
|
: IOutputFormat(header_, out_)
|
|
|
|
, stream{stream_}
|
|
|
|
, format_settings{format_settings_}
|
|
|
|
, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
|
2020-05-03 02:46:36 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
void ArrowBlockOutputFormat::consume(Chunk chunk)
|
|
|
|
{
|
|
|
|
const size_t columns_num = chunk.getNumColumns();
|
|
|
|
std::shared_ptr<arrow::Table> arrow_table;
|
|
|
|
|
2021-06-08 10:37:54 +00:00
|
|
|
if (!ch_column_to_arrow_column)
|
|
|
|
{
|
|
|
|
const Block & header = getPort(PortKind::Main).getHeader();
|
|
|
|
ch_column_to_arrow_column
|
|
|
|
= std::make_unique<CHColumnToArrowColumn>(header, "Arrow", format_settings.arrow.low_cardinality_as_dictionary);
|
|
|
|
}
|
|
|
|
|
2021-06-07 15:15:58 +00:00
|
|
|
ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num);
|
2020-05-03 02:46:36 +00:00
|
|
|
|
|
|
|
if (!writer)
|
2020-05-21 04:07:47 +00:00
|
|
|
prepareWriter(arrow_table->schema());
|
2020-05-03 02:46:36 +00:00
|
|
|
|
|
|
|
// TODO: calculate row_group_size depending on a number of rows and table size
|
2020-05-03 12:26:39 +00:00
|
|
|
auto status = writer->WriteTable(*arrow_table, format_settings.arrow.row_group_size);
|
2020-05-03 02:46:36 +00:00
|
|
|
|
|
|
|
if (!status.ok())
|
2020-07-13 18:25:49 +00:00
|
|
|
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
|
|
|
"Error while writing a table: {}", status.ToString());
|
2020-05-03 02:46:36 +00:00
|
|
|
}
|
|
|
|
|
2021-11-11 18:09:21 +00:00
|
|
|
void ArrowBlockOutputFormat::finalizeImpl()
|
2020-05-03 02:46:36 +00:00
|
|
|
{
|
2020-08-06 05:58:08 +00:00
|
|
|
if (!writer)
|
2020-05-03 02:46:36 +00:00
|
|
|
{
|
2020-08-06 05:58:08 +00:00
|
|
|
const Block & header = getPort(PortKind::Main).getHeader();
|
|
|
|
|
2020-08-07 07:40:05 +00:00
|
|
|
consume(Chunk(header.getColumns(), 0));
|
2020-05-03 02:46:36 +00:00
|
|
|
}
|
2020-08-06 05:58:08 +00:00
|
|
|
|
|
|
|
auto status = writer->Close();
|
|
|
|
if (!status.ok())
|
|
|
|
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
|
|
|
"Error while closing a table: {}", status.ToString());
|
2020-05-03 02:46:36 +00:00
|
|
|
}
|
|
|
|
|
2020-05-21 04:07:47 +00:00
|
|
|
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
|
|
|
|
{
|
2020-07-13 18:25:49 +00:00
|
|
|
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
|
2020-05-21 04:07:47 +00:00
|
|
|
|
|
|
|
// TODO: should we use arrow::ipc::IpcOptions::alignment?
|
|
|
|
if (stream)
|
2020-11-05 17:10:11 +00:00
|
|
|
writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema);
|
2020-05-21 04:07:47 +00:00
|
|
|
else
|
2020-11-05 17:10:11 +00:00
|
|
|
writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema);
|
2020-05-21 04:07:47 +00:00
|
|
|
|
2020-07-13 18:25:49 +00:00
|
|
|
if (!writer_status.ok())
|
|
|
|
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
|
|
|
"Error while opening a table writer: {}", writer_status.status().ToString());
|
|
|
|
|
|
|
|
writer = *writer_status;
|
2020-05-21 04:07:47 +00:00
|
|
|
}
|
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
void registerOutputFormatArrow(FormatFactory & factory)
|
2020-05-03 02:46:36 +00:00
|
|
|
{
|
2021-10-11 16:11:50 +00:00
|
|
|
factory.registerOutputFormat(
|
2020-05-03 02:46:36 +00:00
|
|
|
"Arrow",
|
|
|
|
[](WriteBuffer & buf,
|
|
|
|
const Block & sample,
|
2020-10-06 12:47:52 +00:00
|
|
|
const RowOutputFormatParams &,
|
2020-05-03 02:46:36 +00:00
|
|
|
const FormatSettings & format_settings)
|
|
|
|
{
|
2020-05-21 04:07:47 +00:00
|
|
|
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
|
|
|
|
});
|
2022-01-24 13:27:04 +00:00
|
|
|
factory.markFormatHasNoAppendSupport("Arrow");
|
2020-05-21 04:07:47 +00:00
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
factory.registerOutputFormat(
|
2020-05-21 04:07:47 +00:00
|
|
|
"ArrowStream",
|
|
|
|
[](WriteBuffer & buf,
|
|
|
|
const Block & sample,
|
2020-10-06 12:47:52 +00:00
|
|
|
const RowOutputFormatParams &,
|
2020-05-21 04:07:47 +00:00
|
|
|
const FormatSettings & format_settings)
|
|
|
|
{
|
|
|
|
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
|
2020-05-03 02:46:36 +00:00
|
|
|
});
|
2022-01-24 13:27:04 +00:00
|
|
|
factory.markFormatHasNoAppendSupport("ArrowStream");
|
2020-05-03 02:46:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
class FormatFactory;
|
2021-10-11 16:11:50 +00:00
|
|
|
void registerOutputFormatArrow(FormatFactory &)
|
2020-05-03 02:46:36 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|