From 440fa9b69c5ba1b19328230c864074c74605a2bc Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 30 Nov 2021 15:44:59 +0800 Subject: [PATCH] implement getMissingValues for ORC/Parquet/Arrow --- .../Formats/Impl/ArrowBlockInputFormat.cpp | 12 +++++++++++- src/Processors/Formats/Impl/ArrowBlockInputFormat.h | 4 ++++ .../Formats/Impl/ArrowColumnToCHColumn.cpp | 13 +++++++++---- src/Processors/Formats/Impl/ArrowColumnToCHColumn.h | 5 +++-- src/Processors/Formats/Impl/ORCBlockInputFormat.cpp | 13 ++++++++++++- src/Processors/Formats/Impl/ORCBlockInputFormat.h | 4 ++++ .../Formats/Impl/ParquetBlockInputFormat.cpp | 12 +++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 3 +++ src/Storages/Hive/StorageHive.cpp | 2 +- 9 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 6f90dc18f70..b6a43591398 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -30,6 +30,7 @@ ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & hea Chunk ArrowBlockInputFormat::generate() { Chunk res; + block_missing_values.clear(); arrow::Result> batch_result; if (stream) @@ -63,7 +64,10 @@ Chunk ArrowBlockInputFormat::generate() ++record_batch_current; - arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result); + auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result); + for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx) + for (const auto & column_idx : missing_column_indexes) + block_missing_values.setBit(column_idx, row_idx); return res; } @@ -77,6 +81,12 @@ void ArrowBlockInputFormat::resetParser() else file_reader.reset(); record_batch_current = 0; + block_missing_values.clear(); +} + +const BlockMissingValues & ArrowBlockInputFormat::getMissingValues() const +{ + return block_missing_values; } void ArrowBlockInputFormat::prepareReader() diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h index 44e18e3f852..c0e78ff7e42 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h @@ -24,6 +24,8 @@ public: String getName() const override { return "ArrowBlockInputFormat"; } + const BlockMissingValues & getMissingValues() const override; + private: Chunk generate() override; @@ -39,6 +41,8 @@ private: int record_batch_total = 0; int record_batch_current = 0; + BlockMissingValues block_missing_values; + const FormatSettings format_settings; void prepareReader(); diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index 94627e59bfd..dc25ff19193 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -519,7 +519,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn( { } -void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr & table) +std::vector ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr & table) { NameToColumnPtr name_to_column_ptr; for (const auto& column_name : table->ColumnNames()) @@ -528,20 +528,23 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr) { Columns columns_list; if (name_to_column_ptr.empty()) - return; + return {}; + UInt64 num_rows = name_to_column_ptr.begin()->second->length(); columns_list.reserve(header.rows()); std::unordered_map nested_tables; + std::vector missing_column_indexes; + missing_column_indexes.reserve(header.columns()); for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i) { const ColumnWithTypeAndName & header_column = header.getByPosition(column_i); @@ -566,6 +569,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & if (!read_from_nested) { + missing_column_indexes.push_back(column_i); if (defaults_for_omitted_fields) { ColumnWithTypeAndName column; @@ -606,6 +610,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & } res.setColumns(columns_list, num_rows); + return missing_column_indexes; } } #endif diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h index 0a8e22cd6da..761b389ee03 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h @@ -27,9 +27,10 @@ public: /// data from file without knowing table structure. ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_, bool defaults_for_omitted_fields_); - void arrowTableToCHChunk(Chunk & res, std::shared_ptr & table); + /// Convert arrow::Table to chunk. Returns missing header columns not exists in arrow::Table. + std::vector arrowTableToCHChunk(Chunk & res, std::shared_ptr & table); - void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr); + std::vector arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr); private: const Block header; diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 7612a443bbd..55fddfd7693 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -34,6 +34,7 @@ ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const Chunk ORCBlockInputFormat::generate() { Chunk res; + block_missing_values.clear(); if (!file_reader) prepareReader(); @@ -67,7 +68,11 @@ Chunk ORCBlockInputFormat::generate() std::shared_ptr arrow_column = std::make_shared(vec); name_to_column_ptr[column_name] = arrow_column; } - arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr); + + auto missing_column_indexes = arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr); + for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx) + for (const auto & column_idx : missing_column_indexes) + block_missing_values.setBit(column_idx, row_idx); batch_reader.reset(); return res; @@ -80,6 +85,12 @@ void ORCBlockInputFormat::resetParser() file_reader.reset(); include_indices.clear(); stripe_current = 0; + block_missing_values.clear(); +} + +const BlockMissingValues & ORCBlockInputFormat::getMissingValues() const +{ + return block_missing_values; } static size_t countIndicesForType(std::shared_ptr type) diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.h b/src/Processors/Formats/Impl/ORCBlockInputFormat.h index 857ec7937b7..c8747fb8d36 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.h @@ -26,6 +26,8 @@ public: void resetParser() override; + const BlockMissingValues & getMissingValues() const override; + protected: Chunk generate() override; @@ -48,6 +50,8 @@ private: // indices of columns to read from ORC file std::vector include_indices; + BlockMissingValues block_missing_values; + const FormatSettings format_settings; void prepareReader(); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 861818f9619..2c5799f9d7a 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -40,6 +40,7 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_ Chunk ParquetBlockInputFormat::generate() { Chunk res; + block_missing_values.clear(); if (!file_reader) prepareReader(); @@ -55,7 +56,10 @@ Chunk ParquetBlockInputFormat::generate() ++row_group_current; - arrow_column_to_ch_column->arrowTableToCHChunk(res, table); + auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, table); + for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx) + for (const auto & column_idx : missing_column_indexes) + block_missing_values.setBit(column_idx, row_idx); return res; } @@ -66,6 +70,12 @@ void ParquetBlockInputFormat::resetParser() file_reader.reset(); column_indices.clear(); row_group_current = 0; + block_missing_values.clear(); +} + +const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const +{ + return block_missing_values; } static size_t countIndicesForType(std::shared_ptr type) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 472aec66da3..4597a52d8ee 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -23,6 +23,8 @@ public: String getName() const override { return "ParquetBlockInputFormat"; } + const BlockMissingValues & getMissingValues() const override; + private: Chunk generate() override; @@ -34,6 +36,7 @@ private: std::vector column_indices; std::unique_ptr arrow_column_to_ch_column; int row_group_current = 0; + BlockMissingValues block_missing_values; const FormatSettings format_settings; }; diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index e8fe532b492..53f827972e3 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -119,7 +119,7 @@ public: , max_block_size(max_block_size_) , sample_block(std::move(sample_block_)) , to_read_block(sample_block) - , columns_description(getColumnsDescription(sample_block_, source_info_)) + , columns_description(getColumnsDescription(sample_block, source_info)) , text_input_field_names(text_input_field_names_) , format_settings(getFormatSettings(getContext())) {