Merge pull request #37103 from bigo-sg/hive_partition_key_read

optimization for reading hive file  when all columns to read are partition keys
This commit is contained in:
Kseniia Sumarokova 2022-05-19 14:24:00 +02:00 committed by GitHub
commit d4ad138a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 30 deletions

View File

@ -152,13 +152,26 @@ public:
{
if (!reader)
{
if (current_file_remained_rows)
{
return generateChunkByPartitionKeys();
}
current_idx = source_info->next_uri_to_read.fetch_add(1);
if (current_idx >= source_info->hive_files.size())
return {};
const auto & current_file = source_info->hive_files[current_idx];
current_file = source_info->hive_files[current_idx];
current_path = current_file->getPath();
if (!to_read_block.columns() && current_file->getRows())
{
/// this is the case that all columns to read are partition keys. We can construct const columns
/// directly without reading from hive files.
current_file_remained_rows = *(current_file->getRows());
return generateChunkByPartitionKeys();
}
String uri_with_path = hdfs_namenode_url + current_path;
auto compression = chooseCompressionMethod(current_path, compression_method);
std::unique_ptr<ReadBuffer> raw_read_buf;
@ -261,6 +274,36 @@ public:
}
}
Chunk generateChunkByPartitionKeys()
{
size_t max_rows = getContext()->getSettings().max_block_size;
size_t rows = 0;
if (max_rows > current_file_remained_rows)
{
rows = current_file_remained_rows;
current_file_remained_rows = 0;
}
else
{
rows = max_rows;
current_file_remained_rows -= max_rows;
}
Columns cols;
auto types = source_info->partition_name_types.getTypes();
auto names = source_info->partition_name_types.getNames();
auto fields = current_file->getPartitionValues();
for (size_t i = 0, sz = types.size(); i < sz; ++i)
{
if (!sample_block.has(names[i]))
continue;
auto col = types[i]->createColumnConst(rows, fields[i]);
auto col_idx = sample_block.getPositionByName(names[i]);
cols.insert(cols.begin() + col_idx, col);
}
return Chunk(std::move(cols), rows);
}
private:
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
@ -276,8 +319,10 @@ private:
const Names & text_input_field_names;
FormatSettings format_settings;
HiveFilePtr current_file;
String current_path;
size_t current_idx = 0;
size_t current_file_remained_rows = 0;
Poco::Logger * log = &Poco::Logger::get("StorageHive");
};
@ -628,30 +673,6 @@ bool StorageHive::isColumnOriented() const
return format_name == "Parquet" || format_name == "ORC";
}
void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const
{
if (!isColumnOriented())
sample_block = header_block;
UInt32 erased_columns = 0;
for (const auto & column : partition_columns)
{
if (sample_block.has(column))
erased_columns++;
}
if (erased_columns == sample_block.columns())
{
for (size_t i = 0; i < header_block.columns(); ++i)
{
const auto & col = header_block.getByPosition(i);
if (!partition_columns.count(col.name))
{
sample_block.insert(col);
break;
}
}
}
}
Pipe StorageHive::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -691,8 +712,6 @@ Pipe StorageHive::read(
sources_info->need_file_column = true;
}
getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()});
if (num_streams > sources_info->hive_files.size())
num_streams = sources_info->hive_files.size();

View File

@ -117,8 +117,6 @@ private:
const ContextPtr & context_,
PruneLevel prune_level = PruneLevel::Max) const;
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
void lazyInitialize();
std::optional<UInt64>

View File

@ -375,7 +375,7 @@ def test_cache_read_bytes(started_cluster):
for i in range(10):
result = node.query(
"""
SELECT day, count(*) FROM default.demo_parquet_1 group by day order by day settings input_format_parquet_allow_missing_columns = true
SELECT * FROM default.demo_parquet_1 settings input_format_parquet_allow_missing_columns = true
"""
)
node.query("system flush logs")