add minmax index for hivengine; remove libhdfspp dependency for hive engine

This commit is contained in:
taiyang-li 2022-01-18 18:47:25 +08:00
parent 12d935f3f8
commit dec083ab44
3 changed files with 194 additions and 43 deletions

View File

@ -1,22 +1,37 @@
<<<<<<< HEAD
#include <Storages/Hive/HiveFile.h>
=======
#include <Common/config.h>
>>>>>>> d9558cbca4... add minmax index for hivengine; remove libhdfspp dependency for hive engine
#if USE_HIVE
#include <boost/algorithm/string/case_conv.hpp>
<<<<<<< HEAD
#include <fmt/core.h>
#include <arrow/api.h>
=======
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <arrow/adapters/orc/adapter.h>
#include <arrow/io/memory.h>
>>>>>>> d9558cbca4... add minmax index for hivengine; remove libhdfspp dependency for hive engine
#include <arrow/io/api.h>
#include <arrow/api.h>
#include <arrow/status.h>
#include <orc/OrcFile.hh>
#include <orc/Reader.hh>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <parquet/statistics.h>
#include <fmt/core.h>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/KeyCondition.h>
@ -28,10 +43,22 @@ namespace ErrorCodes
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
template <class FieldType, class StatisticsType>
Range createRangeFromOrcStatistics(const StatisticsType * stats)
{
/// We must check if there are minimum or maximum values in statistics in case of
/// null values or NaN/Inf values of double type.
if (stats->hasMinimum() && stats->hasMaximum())
{
@ -117,65 +144,193 @@ Range HiveOrcFile::buildRange(const orc::ColumnStatistics * col_stats)
void HiveOrcFile::prepareReader()
{
// TODO To be implemented
throw Exception("Unimplemented HiveOrcFile::prepareReader", ErrorCodes::NOT_IMPLEMENTED);
in = std::make_unique<ReadBufferFromHDFS>(namenode_url, path, getContext()->getGlobalContext()->getConfigRef());
auto format_settings = getFormatSettings(getContext());
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &reader));
}
void HiveOrcFile::prepareColumnMapping()
{
// TODO To be implemented
throw Exception("Unimplemented HiveOrcFile::prepareColumnMapping", ErrorCodes::NOT_IMPLEMENTED);
const orc::Type & type = reader->GetRawORCReader()->getType();
size_t size = type.getSubtypeCount();
for (size_t pos = 0; pos < size; pos++)
{
// hive中字符串不区分大小写。所以这里统一改成小写方便匹配
String column{type.getFieldName(pos)};
boost::to_lower(column);
orc_column_positions[column] = pos;
}
}
bool HiveOrcFile::hasMinMaxIndex() const
{
return false;
return !storage_settings->disable_orc_file_minmax_index;
}
std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveOrcFile::buildMinMaxIndex(const orc::Statistics * /*statistics*/)
std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveOrcFile::buildMinMaxIndex(const orc::Statistics * statistics)
{
// TODO To be implemented
throw Exception("Unimplemented HiveOrcFile::buildMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED);
if (!statistics)
return nullptr;
size_t size = index_names_and_types.size();
auto idx = std::make_unique<IMergeTreeDataPart::MinMaxIndex>();
idx->hyperrectangle.resize(size);
size_t i = 0;
for (const auto & name_type : index_names_and_types)
{
String column{name_type.name};
boost::to_lower(column);
auto it = orc_column_positions.find(column);
if (it == orc_column_positions.end())
{
idx->hyperrectangle[i] = buildRange(nullptr);
// std::cerr << "statistics:nullptr" << std::endl;
}
else
{
size_t pos = it->second;
// 注意column statistics从1开始. 0有特殊用途
const orc::ColumnStatistics * col_stats = statistics->getColumnStatistics(pos + 1);
idx->hyperrectangle[i] = buildRange(col_stats);
// std::cerr << "statistics:" << col_stats->toString();
// std::cerr << "name:" << column << ", pos" << pos << ", range:" << idx->hyperrectangle[i].toString() << std::endl;
}
++i;
}
idx->initialized = true;
return idx;
}
void HiveOrcFile::loadMinMaxIndex()
{
// TODO To be implemented
throw Exception("Unimplemented HiveOrcFile::loadMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED);
if (!reader)
{
prepareReader();
prepareColumnMapping();
}
auto statistics = reader->GetRawORCReader()->getStatistics();
minmax_idx = buildMinMaxIndex(statistics.get());
}
bool HiveOrcFile::hasSubMinMaxIndex() const
{
// TODO To be implemented
return false;
return !storage_settings->disable_orc_stripe_minmax_index;
}
void HiveOrcFile::loadSubMinMaxIndex()
{
// TODO To be implemented
throw Exception("Unimplemented HiveOrcFile::loadSubMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED);
if (!reader)
{
prepareReader();
prepareColumnMapping();
}
auto * raw_reader = reader->GetRawORCReader();
auto stripe_num = raw_reader->getNumberOfStripes();
auto stripe_stats_num = raw_reader->getNumberOfStripeStatistics();
if (stripe_num != stripe_stats_num)
throw Exception(
fmt::format("orc file:{} has different strip num {} and strip statistics num {}", path, stripe_num, stripe_stats_num),
ErrorCodes::BAD_ARGUMENTS);
sub_minmax_idxes.resize(stripe_num);
for (size_t i = 0; i < stripe_num; ++i)
{
auto stripe_stats = raw_reader->getStripeStatistics(i);
sub_minmax_idxes[i] = buildMinMaxIndex(stripe_stats.get());
}
}
bool HiveParquetFile::hasSubMinMaxIndex() const
{
// TODO To be implemented
return false;
return !storage_settings->disable_parquet_rowgroup_minmax_index;
}
void HiveParquetFile::prepareReader()
{
// TODO To be implemented
throw Exception("Unimplemented HiveParquetFile::prepareReader", ErrorCodes::NOT_IMPLEMENTED);
in = std::make_unique<ReadBufferFromHDFS>(namenode_url, path, getContext()->getGlobalContext()->getConfigRef());
auto format_settings = getFormatSettings(getContext());
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &reader));
}
void HiveParquetFile::loadSubMinMaxIndex()
{
// TODO To be implemented
throw Exception("Unimplemented HiveParquetFile::loadSubMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED);
if (!reader)
prepareReader();
auto meta = reader->parquet_reader()->metadata();
size_t num_cols = meta->num_columns();
size_t num_row_groups = meta->num_row_groups();
const auto * schema = meta->schema();
for (size_t pos = 0; pos < num_cols; ++pos)
{
String column{schema->Column(pos)->name()};
boost::to_lower(column);
parquet_column_positions[column] = pos;
}
sub_minmax_idxes.resize(num_row_groups);
for (size_t i = 0; i < num_row_groups; ++i)
{
auto row_group_meta = meta->RowGroup(i);
sub_minmax_idxes[i] = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
sub_minmax_idxes[i]->hyperrectangle.resize(num_cols);
size_t j = 0;
auto it = index_names_and_types.begin();
for (; it != index_names_and_types.end(); ++j, ++it)
{
// 如果parquet file中不存在该字段使用空Range
String name{it->name};
boost::to_lower(name);
auto mit = parquet_column_positions.find(name);
if (mit == parquet_column_positions.end())
continue;
size_t pos = mit->second;
auto col_chunk = row_group_meta->ColumnChunk(pos);
if (!col_chunk->is_stats_set())
continue;
auto stats = col_chunk->statistics();
if (stats->HasNullCount() && stats->null_count() > 0)
continue;
if (auto bool_stats = std::dynamic_pointer_cast<parquet::BoolStatistics>(stats))
{
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics<UInt8>(bool_stats);
}
else if (auto int32_stats = std::dynamic_pointer_cast<parquet::Int32Statistics>(stats))
{
// Hive中没有unsigned interger, 这里不用考虑相关case
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics<Int32>(int32_stats);
}
else if (auto int64_stats = std::dynamic_pointer_cast<parquet::Int64Statistics>(stats))
{
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics<Int64>(int64_stats);
}
else if (auto float_stats = std::dynamic_pointer_cast<parquet::FloatStatistics>(stats))
{
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics<Float64>(float_stats);
}
else if (auto double_stats = std::dynamic_pointer_cast<parquet::FloatStatistics>(stats))
{
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics<Float64>(double_stats);
}
else if (auto string_stats = std::dynamic_pointer_cast<parquet::ByteArrayStatistics>(stats))
{
sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(string_stats);
}
// 其他类型无法使用minmax index, 跳过
}
sub_minmax_idxes[i]->initialized = true;
}
}
}

View File

@ -17,26 +17,18 @@
namespace orc
{
class Reader;
class Statistics;
class ColumnStatistics;
}
namespace parquet
{
class ParquetFileReader;
namespace arrow
namespace parquet::arrow
{
class FileReader;
}
}
namespace arrow
namespace arrow::adapters::orc
{
namespace io
{
class RandomAccessFile;
}
class Buffer;
class ORCFileReader;
}
namespace DB
@ -46,6 +38,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
class ReadBufferFromHDFS;
class IHiveFile : public WithContext
{
public:
@ -230,7 +223,8 @@ protected:
virtual void prepareReader();
virtual void prepareColumnMapping();
std::shared_ptr<orc::Reader> reader;
std::unique_ptr<ReadBufferFromHDFS> in;
std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader;
std::map<String, size_t> orc_column_positions;
};
@ -259,8 +253,8 @@ public:
protected:
virtual void prepareReader();
std::shared_ptr<arrow::fs::FileSystem> fs;
std::shared_ptr<parquet::ParquetFileReader> reader;
std::unique_ptr<ReadBufferFromHDFS> in;
std::unique_ptr<parquet::arrow::FileReader> reader;
std::map<String, size_t> parquet_column_positions;
};
}

View File

@ -2,12 +2,14 @@
#if USE_HIVE
#include <fmt/core.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/case_conv.hpp>
#include <fmt/core.h>
#include <Poco/URI.h>
#include <base/logger_useful.h>
#include <parquet/arrow/reader.h>
#include <arrow/adapters/orc/adapter.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Core/Field.h>