suppoort skip splits in orc and parquet

This commit is contained in:
taiyang-li 2022-04-06 16:40:22 +08:00
parent 43e8af697a
commit acb9f1632e
8 changed files with 58 additions and 29 deletions

View File

@ -138,6 +138,7 @@ struct FormatSettings
bool import_nested = false;
bool allow_missing_columns = false;
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_row_groups = {};
} parquet;
struct Pretty
@ -219,6 +220,7 @@ struct FormatSettings
bool allow_missing_columns = false;
int64_t row_batch_size = 100'000;
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_stripes = {};
} orc;
/// For capnProto format we should determine how to

View File

@ -20,13 +20,12 @@ namespace ErrorCodes
}
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
{
}
Chunk ORCBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
if (!file_reader)
@ -35,24 +34,32 @@ Chunk ORCBlockInputFormat::generate()
if (is_stopped)
return {};
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
auto result = file_reader->NextStripeReader(format_settings.orc.row_batch_size, include_indices);
if (!result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", result.status().ToString());
batch_reader = std::move(result).ValueOrDie();
if (!batch_reader)
{
return res;
}
for (; stripe_current < stripe_total && skip_stripes.contains(stripe_current); ++stripe_current)
;
std::shared_ptr<arrow::Table> table;
arrow::Status table_status = batch_reader->ReadAll(&table);
if (!table_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_status.ToString());
if (stripe_current >= stripe_total)
return {};
auto batch_result = file_reader->ReadStripe(stripe_current, include_indices);
if (!batch_result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", batch_result.status().ToString());
auto batch = batch_result.ValueOrDie();
if (!batch)
return {};
auto table_result = arrow::Table::FromRecordBatches({batch});
if (!table_result.ok())
throw ParsingException(
ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_result.status().ToString());
auto table = table_result.ValueOrDie();
if (!table || !table->num_rows())
return res;
return {};
++stripe_current;
Chunk res;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
@ -130,6 +137,9 @@ void ORCBlockInputFormat::prepareReader()
if (is_stopped)
return;
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"ORC",

View File

@ -38,6 +38,7 @@ protected:
}
private:
void prepareReader();
// TODO: check that this class implements every part of its parent
@ -52,8 +53,10 @@ private:
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_stripes;
void prepareReader();
int stripe_total = 0;
int stripe_current = 0;
std::atomic<int> is_stopped{0};
};

View File

@ -32,7 +32,7 @@ namespace ErrorCodes
} while (false)
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_row_groups(format_settings.parquet.skip_row_groups)
{
}
@ -47,6 +47,9 @@ Chunk ParquetBlockInputFormat::generate()
if (is_stopped)
return {};
for (; row_group_current < row_group_total && skip_row_groups.contains(row_group_current); ++row_group_current)
;
if (row_group_current >= row_group_total)
return res;

View File

@ -38,13 +38,14 @@ private:
std::unique_ptr<parquet::arrow::FileReader> file_reader;
int row_group_total = 0;
int row_group_current = 0;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int row_group_current = 0;
std::vector<size_t> missing_columns;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups;
std::atomic<int> is_stopped{0};
};

View File

@ -139,7 +139,7 @@ void HiveOrcFile::prepareColumnMapping()
for (size_t pos = 0; pos < count; pos++)
{
/// Column names in hive is case-insensitive.
String columnn{type.getFieldName(pos)};
String column{type.getFieldName(pos)};
boost::to_lower(column);
orc_column_positions[column] = pos;
}

View File

@ -125,9 +125,9 @@ public:
virtual const std::vector<MinMaxIndexPtr> & getSubMinMaxIndexes() const { return sub_minmax_idxes; }
virtual void setSkipSplits(const std::set<int> & splits) { skip_splits = splits; }
virtual void setSkipSplits(const std::unordered_set<int> & skip_splits_) { skip_splits = skip_splits_; }
virtual const std::set<int> & getSkipSplits() const { return skip_splits; }
virtual const std::unordered_set<int> & getSkipSplits() const { return skip_splits; }
inline std::string describeMinMaxIndex(const MinMaxIndexPtr & idx) const
{
@ -157,7 +157,7 @@ protected:
MinMaxIndexPtr minmax_idx;
std::vector<MinMaxIndexPtr> sub_minmax_idxes;
/// Skip splits for this file after applying minmax index (if any)
std::set<int> skip_splits;
std::unordered_set<int> skip_splits;
std::shared_ptr<HiveSettings> storage_settings;
};

View File

@ -111,9 +111,9 @@ public:
: SourceWithProgress(getHeader(sample_block_, source_info_))
, WithContext(context_)
, source_info(std::move(source_info_))
, hdfs_namenode_url(hdfs_namenode_url_)
, hdfs_namenode_url(std::move(hdfs_namenode_url_))
, format(std::move(format_))
, compression_method(compression_method_)
, compression_method(std::move(compression_method_))
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, columns_description(getColumnsDescription(sample_block, source_info))
@ -121,15 +121,25 @@ public:
, format_settings(getFormatSettings(getContext()))
{
to_read_block = sample_block;
/// Initialize to_read_block, which is used to read data from HDFS.
for (const auto & name_type : source_info->partition_name_types)
{
if (to_read_block.has(name_type.name))
to_read_block.erase(name_type.name);
}
}
/// Initialize format settings
format_settings.hive_text.input_field_names = text_input_field_names;
FormatSettings updateFormatSettings(const HiveFilePtr & hive_file)
{
auto updated = format_settings;
if (format == "HiveText")
updated.hive_text.input_field_names = text_input_field_names;
else if (format == "ORC")
updated.orc.skip_stripes = hive_file->getSkipSplits();
else if (format == "Parquet")
updated.parquet.skip_row_groups = hive_file->getSkipSplits();
return updated;
}
String getName() const override { return "Hive"; }
@ -188,7 +198,7 @@ public:
read_buf = std::move(remote_read_buf);
auto input_format = FormatFactory::instance().getInputFormat(
format, *read_buf, to_read_block, getContext(), max_block_size, format_settings);
format, *read_buf, to_read_block, getContext(), max_block_size, updateFormatSettings(curr_file));
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
@ -545,7 +555,7 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded(
/// Load sub-file level minmax index and apply
if (hive_file->hasSubMinMaxIndex())
{
std::set<int> skip_splits;
std::unordered_set<int> skip_splits;
hive_file->loadSubMinMaxIndex();
const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes();
for (size_t i = 0; i < sub_minmax_idxes.size(); ++i)