ClickHouse/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp

164 lines
5.2 KiB
C++
Raw Normal View History

2019-08-21 14:19:47 +00:00
#include "ORCBlockInputFormat.h"
#if USE_ORC
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/io/memory.h>
#include "ArrowBufferedStreams.h"
2019-08-21 14:19:47 +00:00
#include "ArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>
2019-08-21 14:19:47 +00:00
namespace DB
{
2020-02-25 18:20:08 +00:00
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
2020-05-03 23:19:56 +00:00
extern const int BAD_ARGUMENTS;
2020-02-25 18:10:48 +00:00
extern const int CANNOT_READ_ALL_DATA;
}
2019-08-21 14:19:47 +00:00
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
2020-02-25 18:20:08 +00:00
{
}
2019-08-21 14:19:47 +00:00
2020-02-25 18:20:08 +00:00
Chunk ORCBlockInputFormat::generate()
{
Chunk res;
2019-08-21 14:19:47 +00:00
if (!file_reader)
prepareReader();
2021-11-13 14:18:16 +00:00
if (!batch_reader)
2021-10-31 19:53:24 +00:00
{
2021-11-30 16:39:29 +00:00
auto result = file_reader->NextStripeReader(DBMS_DEFAULT_BUFFER_SIZE, include_indices);
if (!result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", result.status().ToString());
2021-12-01 11:19:24 +00:00
batch_reader = std::move(result).ValueOrDie();
if (!batch_reader)
return res;
2021-11-13 14:18:16 +00:00
}
2019-08-21 14:19:47 +00:00
2021-11-13 14:18:16 +00:00
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
batch_status.ToString());
2020-05-02 19:40:50 +00:00
2021-11-13 14:18:16 +00:00
if (!batch_result || !batch_result->num_rows())
return res;
2021-10-31 19:53:24 +00:00
2021-11-13 14:18:16 +00:00
ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
2021-10-31 19:53:24 +00:00
}
2021-11-13 14:18:16 +00:00
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
batch_reader.reset();
2020-02-25 18:20:08 +00:00
return res;
}
void ORCBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
include_indices.clear();
stripe_current = 0;
}
2021-05-20 13:47:12 +00:00
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
{
if (type->id() == arrow::Type::LIST)
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type()) + 1;
if (type->id() == arrow::Type::STRUCT)
{
2021-05-27 19:01:06 +00:00
int indices = 1;
auto * struct_type = static_cast<arrow::StructType *>(type.get());
for (int i = 0; i != struct_type->num_fields(); ++i)
indices += countIndicesForType(struct_type->field(i)->type());
2021-05-27 19:01:06 +00:00
return indices;
}
2021-05-27 19:01:06 +00:00
if (type->id() == arrow::Type::MAP)
{
auto * map_type = static_cast<arrow::MapType *>(type.get());
2021-05-27 19:10:45 +00:00
return countIndicesForType(map_type->key_type()) + countIndicesForType(map_type->item_type());
2021-05-27 19:01:06 +00:00
}
return 1;
}
void ORCBlockInputFormat::prepareReader()
{
2021-11-30 16:39:29 +00:00
auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool());
if (!result.ok())
throw Exception(result.status().ToString(), ErrorCodes::BAD_ARGUMENTS);
2021-12-01 11:19:24 +00:00
file_reader = std::move(result).ValueOrDie();
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
2021-11-30 16:39:29 +00:00
auto read_schema_result = file_reader->ReadSchema();
if (!read_schema_result.ok())
throw Exception(read_schema_result.status().ToString(), ErrorCodes::BAD_ARGUMENTS);
2021-12-01 11:19:24 +00:00
std::shared_ptr<arrow::Schema> schema = std::move(read_schema_result).ValueOrDie();
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), "ORC", format_settings.orc.import_nested);
std::unordered_set<String> nested_table_names;
if (format_settings.orc.import_nested)
nested_table_names = Nested::getAllTableNames(getPort().getHeader());
2021-06-07 15:15:58 +00:00
/// In ReadStripe column indices should be started from 1,
/// because 0 indicates to select all columns.
int index = 1;
for (int i = 0; i < schema->num_fields(); ++i)
{
/// LIST type require 2 indices, STRUCT - the number of elements + 1,
/// so we should recursively count the number of indices we need for this type.
int indexes_count = countIndicesForType(schema->field(i)->type());
const auto & name = schema->field(i)->name();
if (getPort().getHeader().has(name) || nested_table_names.contains(name))
{
2021-10-31 19:53:24 +00:00
column_names.push_back(name);
for (int j = 0; j != indexes_count; ++j)
include_indices.push_back(index + j);
}
index += indexes_count;
}
2020-02-25 18:20:08 +00:00
}
2021-10-11 16:11:50 +00:00
void registerInputFormatORC(FormatFactory &factory)
2020-02-25 18:20:08 +00:00
{
2021-10-11 16:11:50 +00:00
factory.registerInputFormat(
2020-02-25 18:20:08 +00:00
"ORC",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
2020-05-03 23:19:56 +00:00
{
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
2020-02-25 18:20:08 +00:00
});
2021-03-30 21:25:37 +00:00
factory.markFormatAsColumnOriented("ORC");
2020-02-25 18:20:08 +00:00
}
2019-08-21 14:19:47 +00:00
}
#else
namespace DB
{
class FormatFactory;
2021-10-11 16:11:50 +00:00
void registerInputFormatORC(FormatFactory &)
2019-08-21 14:19:47 +00:00
{
}
}
#endif