2020-04-28 19:52:22 +00:00
|
|
|
#include "ArrowBlockInputFormat.h"
|
2021-06-02 08:51:07 +00:00
|
|
|
|
2020-04-28 19:52:22 +00:00
|
|
|
#if USE_ARROW
|
|
|
|
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <IO/ReadBufferFromMemory.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <IO/copyData.h>
|
|
|
|
#include <arrow/api.h>
|
|
|
|
#include <arrow/ipc/reader.h>
|
2020-07-13 18:25:49 +00:00
|
|
|
#include <arrow/result.h>
|
2020-05-05 12:56:54 +00:00
|
|
|
#include "ArrowBufferedStreams.h"
|
2020-04-28 19:52:22 +00:00
|
|
|
#include "ArrowColumnToCHColumn.h"
|
|
|
|
|
2020-07-13 01:11:35 +00:00
|
|
|
|
2020-04-28 19:52:22 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-05-21 04:07:47 +00:00
|
|
|
extern const int UNKNOWN_EXCEPTION;
|
2020-04-28 19:52:22 +00:00
|
|
|
extern const int CANNOT_READ_ALL_DATA;
|
|
|
|
}
|
|
|
|
|
2021-07-01 17:59:28 +00:00
|
|
|
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
|
|
|
|
: IInputFormat(header_, in_), stream{stream_}, format_settings(format_settings_)
|
2020-04-28 19:52:22 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
Chunk ArrowBlockInputFormat::generate()
|
|
|
|
{
|
|
|
|
Chunk res;
|
2021-11-30 07:44:59 +00:00
|
|
|
block_missing_values.clear();
|
2020-07-13 18:25:49 +00:00
|
|
|
arrow::Result<std::shared_ptr<arrow::RecordBatch>> batch_result;
|
2020-05-25 02:50:55 +00:00
|
|
|
|
2020-05-21 04:07:47 +00:00
|
|
|
if (stream)
|
|
|
|
{
|
2021-04-21 05:47:08 +00:00
|
|
|
if (!stream_reader)
|
|
|
|
prepareReader();
|
|
|
|
|
2021-12-27 19:42:56 +00:00
|
|
|
if (is_stopped)
|
|
|
|
return {};
|
|
|
|
|
2020-07-13 18:25:49 +00:00
|
|
|
batch_result = stream_reader->Next();
|
|
|
|
if (batch_result.ok() && !(*batch_result))
|
2020-05-21 07:28:10 +00:00
|
|
|
return res;
|
2020-05-21 04:07:47 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-04-21 05:47:08 +00:00
|
|
|
if (!file_reader)
|
|
|
|
prepareReader();
|
|
|
|
|
2021-12-27 19:42:56 +00:00
|
|
|
if (is_stopped)
|
|
|
|
return {};
|
|
|
|
|
2020-05-27 12:50:12 +00:00
|
|
|
if (record_batch_current >= record_batch_total)
|
|
|
|
return res;
|
|
|
|
|
2020-07-13 18:25:49 +00:00
|
|
|
batch_result = file_reader->ReadRecordBatch(record_batch_current);
|
2020-05-21 04:07:47 +00:00
|
|
|
}
|
2020-05-25 02:50:55 +00:00
|
|
|
|
2020-07-13 18:25:49 +00:00
|
|
|
if (!batch_result.ok())
|
2020-12-10 17:26:36 +00:00
|
|
|
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
|
2020-07-13 18:25:49 +00:00
|
|
|
"Error while reading batch of Arrow data: {}", batch_result.status().ToString());
|
2020-05-03 19:49:06 +00:00
|
|
|
|
2020-07-13 18:25:49 +00:00
|
|
|
auto table_result = arrow::Table::FromRecordBatches({*batch_result});
|
|
|
|
if (!table_result.ok())
|
2020-12-10 17:26:36 +00:00
|
|
|
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
|
2020-07-13 18:25:49 +00:00
|
|
|
"Error while reading batch of Arrow data: {}", table_result.status().ToString());
|
2020-05-03 18:12:14 +00:00
|
|
|
|
2020-05-04 14:28:36 +00:00
|
|
|
++record_batch_current;
|
|
|
|
|
2021-12-02 12:51:19 +00:00
|
|
|
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
|
|
|
|
|
2021-12-02 08:14:25 +00:00
|
|
|
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
|
|
|
|
/// Otherwise fill the missing columns with zero values of its type.
|
|
|
|
if (format_settings.defaults_for_omitted_fields)
|
|
|
|
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
|
|
|
|
for (const auto & column_idx : missing_columns)
|
|
|
|
block_missing_values.setBit(column_idx, row_idx);
|
2020-04-28 19:52:22 +00:00
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ArrowBlockInputFormat::resetParser()
|
|
|
|
{
|
|
|
|
IInputFormat::resetParser();
|
2020-05-04 00:52:28 +00:00
|
|
|
|
2020-05-21 04:07:47 +00:00
|
|
|
if (stream)
|
|
|
|
stream_reader.reset();
|
|
|
|
else
|
|
|
|
file_reader.reset();
|
2021-04-21 05:47:08 +00:00
|
|
|
record_batch_current = 0;
|
2021-11-30 07:44:59 +00:00
|
|
|
block_missing_values.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
const BlockMissingValues & ArrowBlockInputFormat::getMissingValues() const
|
|
|
|
{
|
|
|
|
return block_missing_values;
|
2020-05-04 00:52:28 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
static std::shared_ptr<arrow::RecordBatchReader> createStreamReader(ReadBuffer & in)
|
2020-05-04 00:52:28 +00:00
|
|
|
{
|
2021-12-15 11:30:57 +00:00
|
|
|
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(in));
|
|
|
|
if (!stream_reader_status.ok())
|
|
|
|
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
|
|
|
"Error while opening a table: {}", stream_reader_status.status().ToString());
|
|
|
|
return *stream_reader_status;
|
|
|
|
}
|
2021-06-07 15:15:58 +00:00
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
static std::shared_ptr<arrow::ipc::RecordBatchFileReader> createFileReader(ReadBuffer & in, const FormatSettings & format_settings, std::atomic<int> & is_stopped)
|
|
|
|
{
|
|
|
|
auto arrow_file = asArrowFile(in, format_settings, is_stopped);
|
|
|
|
if (is_stopped)
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(std::move(arrow_file));
|
|
|
|
if (!file_reader_status.ok())
|
|
|
|
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
|
|
|
|
"Error while opening a table: {}", file_reader_status.status().ToString());
|
|
|
|
return *file_reader_status;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-05-04 00:52:28 +00:00
|
|
|
void ArrowBlockInputFormat::prepareReader()
|
|
|
|
{
|
2021-06-07 15:15:58 +00:00
|
|
|
std::shared_ptr<arrow::Schema> schema;
|
2020-05-21 04:07:47 +00:00
|
|
|
if (stream)
|
2020-07-13 18:25:49 +00:00
|
|
|
{
|
2021-12-15 11:30:57 +00:00
|
|
|
stream_reader = createStreamReader(*in);
|
2021-06-07 15:15:58 +00:00
|
|
|
schema = stream_reader->schema();
|
2020-07-13 18:25:49 +00:00
|
|
|
}
|
2020-05-21 04:07:47 +00:00
|
|
|
else
|
2020-07-13 18:25:49 +00:00
|
|
|
{
|
2021-12-15 11:30:57 +00:00
|
|
|
file_reader = createFileReader(*in, format_settings, is_stopped);
|
|
|
|
if (!file_reader)
|
2021-12-27 19:42:56 +00:00
|
|
|
return;
|
2021-06-07 15:15:58 +00:00
|
|
|
schema = file_reader->schema();
|
2020-07-13 18:25:49 +00:00
|
|
|
}
|
2020-05-21 04:07:47 +00:00
|
|
|
|
2021-11-30 06:52:26 +00:00
|
|
|
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
|
2021-12-02 08:14:25 +00:00
|
|
|
getPort().getHeader(), "Arrow", format_settings.arrow.import_nested, format_settings.arrow.allow_missing_columns);
|
|
|
|
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
|
2021-06-07 15:15:58 +00:00
|
|
|
|
2020-05-21 04:07:47 +00:00
|
|
|
if (stream)
|
|
|
|
record_batch_total = -1;
|
|
|
|
else
|
|
|
|
record_batch_total = file_reader->num_record_batches();
|
|
|
|
|
2020-05-04 14:28:36 +00:00
|
|
|
record_batch_current = 0;
|
2020-04-28 19:52:22 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
ArrowSchemaReader::ArrowSchemaReader(ReadBuffer & in_, bool stream_, const FormatSettings & format_settings_)
|
|
|
|
: ISchemaReader(in_), stream(stream_), format_settings(format_settings_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
NamesAndTypesList ArrowSchemaReader::readSchema()
|
|
|
|
{
|
|
|
|
std::shared_ptr<arrow::Schema> schema;
|
|
|
|
|
|
|
|
if (stream)
|
|
|
|
schema = createStreamReader(in)->schema();
|
|
|
|
else
|
|
|
|
{
|
|
|
|
std::atomic<int> is_stopped = 0;
|
|
|
|
schema = createFileReader(in, format_settings, is_stopped)->schema();
|
|
|
|
}
|
|
|
|
|
|
|
|
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, stream ? "ArrowStream" : "Arrow");
|
|
|
|
return header.getNamesAndTypesList();
|
|
|
|
}
|
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
void registerInputFormatArrow(FormatFactory & factory)
|
2020-04-28 19:52:22 +00:00
|
|
|
{
|
2021-10-11 16:11:50 +00:00
|
|
|
factory.registerInputFormat(
|
2020-05-21 04:07:47 +00:00
|
|
|
"Arrow",
|
|
|
|
[](ReadBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
const RowInputFormatParams & /* params */,
|
2021-07-01 17:59:28 +00:00
|
|
|
const FormatSettings & format_settings)
|
2020-05-21 04:07:47 +00:00
|
|
|
{
|
2021-07-01 17:59:28 +00:00
|
|
|
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
|
2020-05-21 04:07:47 +00:00
|
|
|
});
|
2021-03-30 21:25:37 +00:00
|
|
|
factory.markFormatAsColumnOriented("Arrow");
|
2021-10-11 16:11:50 +00:00
|
|
|
factory.registerInputFormat(
|
2020-05-21 04:07:47 +00:00
|
|
|
"ArrowStream",
|
|
|
|
[](ReadBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
const RowInputFormatParams & /* params */,
|
2021-07-01 17:59:28 +00:00
|
|
|
const FormatSettings & format_settings)
|
2020-05-21 04:07:47 +00:00
|
|
|
{
|
2021-07-01 17:59:28 +00:00
|
|
|
return std::make_shared<ArrowBlockInputFormat>(buf, sample, true, format_settings);
|
2020-05-21 04:07:47 +00:00
|
|
|
});
|
2020-04-28 19:52:22 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
void registerArrowSchemaReader(FormatFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerSchemaReader(
|
|
|
|
"Arrow",
|
|
|
|
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
|
|
|
|
{
|
|
|
|
return std::make_shared<ArrowSchemaReader>(buf, false, settings);
|
|
|
|
});
|
|
|
|
factory.registerSchemaReader(
|
|
|
|
"ArrowStream",
|
|
|
|
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
|
|
|
|
{
|
|
|
|
return std::make_shared<ArrowSchemaReader>(buf, true, settings);
|
|
|
|
});}
|
2020-04-28 19:52:22 +00:00
|
|
|
}
|
|
|
|
#else
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
class FormatFactory;
|
2021-10-11 16:11:50 +00:00
|
|
|
void registerInputFormatArrow(FormatFactory &)
|
2020-04-28 19:52:22 +00:00
|
|
|
{
|
|
|
|
}
|
2021-12-15 11:30:57 +00:00
|
|
|
|
|
|
|
void registerArrowSchemaReader(FormatFactory &) {}
|
2020-04-28 19:52:22 +00:00
|
|
|
}
|
|
|
|
|
2020-05-02 19:32:21 +00:00
|
|
|
#endif
|