diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 00ab0b73807..1332d844ff3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -593,6 +593,7 @@ class IColumn; M(Bool, input_format_null_as_default, true, "For text input formats initialize null fields with default values if data type of this field is not nullable", 0) \ M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \ M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \ + M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \ M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \ M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \ \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 75b096de425..09e0876bb4f 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -114,6 +114,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary; format_settings.arrow.import_nested = settings.input_format_arrow_import_nested; format_settings.orc.import_nested = settings.input_format_orc_import_nested; + format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size; format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields; format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode; format_settings.seekable_read = settings.input_format_allow_seeks; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index a18a20bac7b..909b173007a 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -200,6 +200,7 @@ struct FormatSettings struct { bool import_nested = false; + int64_t row_batch_size = 100'000; } orc; /// For capnProto format we should determine how to diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 8768e2f5f14..c645595919e 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include "ArrowBufferedStreams.h" #include "ArrowColumnToCHColumn.h" #include @@ -38,37 +37,22 @@ Chunk ORCBlockInputFormat::generate() if (!file_reader) prepareReader(); + std::shared_ptr batch_reader; + arrow::Status reader_status = file_reader->NextStripeReader(format_settings.orc.row_batch_size, include_indices, &batch_reader); + if (!reader_status.ok()) + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", reader_status.ToString()); if (!batch_reader) - { - arrow::Status reader_status = file_reader->NextStripeReader( - DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader); - if (!reader_status.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, - "Failed to create batch reader: {}", - reader_status.ToString()); - if (!batch_reader) - return res; - } - - std::shared_ptr batch_result; - arrow::Status batch_status = batch_reader->ReadNext(&batch_result); - if (!batch_status.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, - "Error while reading batch of ORC data: {}", - batch_status.ToString()); - - if (!batch_result || !batch_result->num_rows()) return res; - ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr; - for (const auto & column_name : column_names) - { - arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)}; - std::shared_ptr arrow_column = std::make_shared(vec); - name_to_column_ptr[column_name] = arrow_column; - } - arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr); - batch_reader.reset(); + std::shared_ptr table; + arrow::Status table_status = batch_reader->ReadAll(&table); + if (!table_status.ok()) + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_status.ToString()); + + if (!table || !table->num_rows()) + return res; + + arrow_column_to_ch_column->arrowTableToCHChunk(res, table); return res; } @@ -79,7 +63,6 @@ void ORCBlockInputFormat::resetParser() file_reader.reset(); include_indices.clear(); - stripe_current = 0; } static size_t countIndicesForType(std::shared_ptr type) @@ -108,8 +91,6 @@ static size_t countIndicesForType(std::shared_ptr type) void ORCBlockInputFormat::prepareReader() { THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader)); - stripe_total = file_reader->NumberOfStripes(); - stripe_current = 0; std::shared_ptr schema; THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema)); diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.h b/src/Processors/Formats/Impl/ORCBlockInputFormat.h index 857ec7937b7..639aaee73bb 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.h @@ -35,16 +35,10 @@ private: std::unique_ptr file_reader; - std::shared_ptr batch_reader; - std::unique_ptr arrow_column_to_ch_column; std::vector column_names; - int stripe_total = 0; - - int stripe_current = 0; - // indices of columns to read from ORC file std::vector include_indices;