Merge pull request #32929 from kreuzerkrieg/get_ORC_right

Stop reading incomplete stripes and skip rows.
This commit is contained in:
Kruglov Pavel 2021-12-20 15:31:48 +03:00 committed by GitHub
commit abbab7ff87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 16 additions and 38 deletions

View File

@ -593,6 +593,7 @@ class IColumn;
M(Bool, input_format_null_as_default, true, "For text input formats initialize null fields with default values if data type of this field is not nullable", 0) \
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
\

View File

@ -114,6 +114,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.seekable_read = settings.input_format_allow_seeks;

View File

@ -200,6 +200,7 @@ struct FormatSettings
struct
{
bool import_nested = false;
int64_t row_batch_size = 100'000;
} orc;
/// For capnProto format we should determine how to

View File

@ -5,7 +5,6 @@
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/io/memory.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>
@ -38,37 +37,22 @@ Chunk ORCBlockInputFormat::generate()
if (!file_reader)
prepareReader();
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
arrow::Status reader_status = file_reader->NextStripeReader(format_settings.orc.row_batch_size, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", reader_status.ToString());
if (!batch_reader)
{
arrow::Status reader_status = file_reader->NextStripeReader(
DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Failed to create batch reader: {}",
reader_status.ToString());
if (!batch_reader)
return res;
}
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
batch_status.ToString());
if (!batch_result || !batch_result->num_rows())
return res;
ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
batch_reader.reset();
std::shared_ptr<arrow::Table> table;
arrow::Status table_status = batch_reader->ReadAll(&table);
if (!table_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_status.ToString());
if (!table || !table->num_rows())
return res;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
return res;
}
@ -79,7 +63,6 @@ void ORCBlockInputFormat::resetParser()
file_reader.reset();
include_indices.clear();
stripe_current = 0;
}
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
@ -108,8 +91,6 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ORCBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader));
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema));

View File

@ -35,16 +35,10 @@ private:
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
std::vector<String> column_names;
int stripe_total = 0;
int stripe_current = 0;
// indices of columns to read from ORC file
std::vector<int> include_indices;