implement getMissingValues for ORC/Parquet/Arrow

This commit is contained in:
taiyang-li 2021-11-30 15:44:59 +08:00
parent cacf516e3e
commit 440fa9b69c
9 changed files with 58 additions and 10 deletions

View File

@ -30,6 +30,7 @@ ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & hea
Chunk ArrowBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
arrow::Result<std::shared_ptr<arrow::RecordBatch>> batch_result;
if (stream)
@ -63,7 +64,10 @@ Chunk ArrowBlockInputFormat::generate()
++record_batch_current;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
return res;
}
@ -77,6 +81,12 @@ void ArrowBlockInputFormat::resetParser()
else
file_reader.reset();
record_batch_current = 0;
block_missing_values.clear();
}
const BlockMissingValues & ArrowBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
void ArrowBlockInputFormat::prepareReader()

View File

@ -24,6 +24,8 @@ public:
String getName() const override { return "ArrowBlockInputFormat"; }
const BlockMissingValues & getMissingValues() const override;
private:
Chunk generate() override;
@ -39,6 +41,8 @@ private:
int record_batch_total = 0;
int record_batch_current = 0;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
void prepareReader();

View File

@ -519,7 +519,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
{
}
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
std::vector<size_t> ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : table->ColumnNames())
@ -528,20 +528,23 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
name_to_column_ptr[column_name] = arrow_column;
}
arrowColumnsToCHChunk(res, name_to_column_ptr);
return arrowColumnsToCHChunk(res, name_to_column_ptr);
}
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
std::vector<size_t> ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
{
Columns columns_list;
if (name_to_column_ptr.empty())
return;
return {};
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.rows());
std::unordered_map<String, BlockPtr> nested_tables;
std::vector<size_t> missing_column_indexes;
missing_column_indexes.reserve(header.columns());
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
@ -566,6 +569,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
if (!read_from_nested)
{
missing_column_indexes.push_back(column_i);
if (defaults_for_omitted_fields)
{
ColumnWithTypeAndName column;
@ -606,6 +610,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
res.setColumns(columns_list, num_rows);
return missing_column_indexes;
}
}
#endif

View File

@ -27,9 +27,10 @@ public:
/// data from file without knowing table structure.
ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_, bool defaults_for_omitted_fields_);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
/// Convert arrow::Table to chunk. Returns missing header columns not exists in arrow::Table.
std::vector<size_t> arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
std::vector<size_t> arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
private:
const Block header;

View File

@ -34,6 +34,7 @@ ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const
Chunk ORCBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
if (!file_reader)
prepareReader();
@ -67,7 +68,11 @@ Chunk ORCBlockInputFormat::generate()
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
auto missing_column_indexes = arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
batch_reader.reset();
return res;
@ -80,6 +85,12 @@ void ORCBlockInputFormat::resetParser()
file_reader.reset();
include_indices.clear();
stripe_current = 0;
block_missing_values.clear();
}
const BlockMissingValues & ORCBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)

View File

@ -26,6 +26,8 @@ public:
void resetParser() override;
const BlockMissingValues & getMissingValues() const override;
protected:
Chunk generate() override;
@ -48,6 +50,8 @@ private:
// indices of columns to read from ORC file
std::vector<int> include_indices;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
void prepareReader();

View File

@ -40,6 +40,7 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_
Chunk ParquetBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
if (!file_reader)
prepareReader();
@ -55,7 +56,10 @@ Chunk ParquetBlockInputFormat::generate()
++row_group_current;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
return res;
}
@ -66,6 +70,12 @@ void ParquetBlockInputFormat::resetParser()
file_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();
}
const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)

View File

@ -23,6 +23,8 @@ public:
String getName() const override { return "ParquetBlockInputFormat"; }
const BlockMissingValues & getMissingValues() const override;
private:
Chunk generate() override;
@ -34,6 +36,7 @@ private:
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int row_group_current = 0;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
};

View File

@ -119,7 +119,7 @@ public:
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, to_read_block(sample_block)
, columns_description(getColumnsDescription(sample_block_, source_info_))
, columns_description(getColumnsDescription(sample_block, source_info))
, text_input_field_names(text_input_field_names_)
, format_settings(getFormatSettings(getContext()))
{