From dec083ab4406f44bc5c189f74297052116049b4b Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 18 Jan 2022 18:47:25 +0800 Subject: [PATCH] add minmax index for hivengine; remove libhdfspp dependency for hive engine --- src/Storages/Hive/HiveFile.cpp | 203 ++++++++++++++++++++++++++---- src/Storages/Hive/HiveFile.h | 28 ++--- src/Storages/Hive/StorageHive.cpp | 6 +- 3 files changed, 194 insertions(+), 43 deletions(-) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index b0cfa9809e1..a74a5e36575 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -1,22 +1,37 @@ +<<<<<<< HEAD #include +======= + +#include +>>>>>>> d9558cbca4... add minmax index for hivengine; remove libhdfspp dependency for hive engine #if USE_HIVE #include +<<<<<<< HEAD #include #include +======= +#include +#include +#include +#include +>>>>>>> d9558cbca4... add minmax index for hivengine; remove libhdfspp dependency for hive engine #include +#include #include -#include -#include #include #include #include +#include #include #include #include +#include +#include +#include #include #include @@ -28,10 +43,22 @@ namespace ErrorCodes namespace DB { +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \ + } while (false) + + template Range createRangeFromOrcStatistics(const StatisticsType * stats) { - /// We must check if there are minimum or maximum values in statistics in case of /// null values or NaN/Inf values of double type. if (stats->hasMinimum() && stats->hasMaximum()) { @@ -117,65 +144,193 @@ Range HiveOrcFile::buildRange(const orc::ColumnStatistics * col_stats) void HiveOrcFile::prepareReader() { - // TODO To be implemented - throw Exception("Unimplemented HiveOrcFile::prepareReader", ErrorCodes::NOT_IMPLEMENTED); + in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); + auto format_settings = getFormatSettings(getContext()); + THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &reader)); } void HiveOrcFile::prepareColumnMapping() { - // TODO To be implemented - throw Exception("Unimplemented HiveOrcFile::prepareColumnMapping", ErrorCodes::NOT_IMPLEMENTED); + const orc::Type & type = reader->GetRawORCReader()->getType(); + size_t size = type.getSubtypeCount(); + for (size_t pos = 0; pos < size; pos++) + { + // hive中字符串不区分大小写。所以这里统一改成小写,方便匹配 + String column{type.getFieldName(pos)}; + boost::to_lower(column); + orc_column_positions[column] = pos; + } } bool HiveOrcFile::hasMinMaxIndex() const { - return false; + return !storage_settings->disable_orc_file_minmax_index; } -std::unique_ptr HiveOrcFile::buildMinMaxIndex(const orc::Statistics * /*statistics*/) +std::unique_ptr HiveOrcFile::buildMinMaxIndex(const orc::Statistics * statistics) { - // TODO To be implemented - throw Exception("Unimplemented HiveOrcFile::buildMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED); + if (!statistics) + return nullptr; + + size_t size = index_names_and_types.size(); + auto idx = std::make_unique(); + idx->hyperrectangle.resize(size); + + size_t i = 0; + for (const auto & name_type : index_names_and_types) + { + String column{name_type.name}; + boost::to_lower(column); + auto it = orc_column_positions.find(column); + if (it == orc_column_positions.end()) + { + idx->hyperrectangle[i] = buildRange(nullptr); + // std::cerr << "statistics:nullptr" << std::endl; + } + else + { + size_t pos = it->second; + // 注意:column statistics从1开始. 0有特殊用途 + const orc::ColumnStatistics * col_stats = statistics->getColumnStatistics(pos + 1); + idx->hyperrectangle[i] = buildRange(col_stats); + // std::cerr << "statistics:" << col_stats->toString(); + // std::cerr << "name:" << column << ", pos" << pos << ", range:" << idx->hyperrectangle[i].toString() << std::endl; + } + ++i; + } + idx->initialized = true; + return idx; } void HiveOrcFile::loadMinMaxIndex() { - // TODO To be implemented - throw Exception("Unimplemented HiveOrcFile::loadMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED); + if (!reader) + { + prepareReader(); + prepareColumnMapping(); + } + + auto statistics = reader->GetRawORCReader()->getStatistics(); + minmax_idx = buildMinMaxIndex(statistics.get()); } bool HiveOrcFile::hasSubMinMaxIndex() const { - // TODO To be implemented - return false; + return !storage_settings->disable_orc_stripe_minmax_index; } void HiveOrcFile::loadSubMinMaxIndex() { - // TODO To be implemented - throw Exception("Unimplemented HiveOrcFile::loadSubMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED); + if (!reader) + { + prepareReader(); + prepareColumnMapping(); + } + + auto * raw_reader = reader->GetRawORCReader(); + auto stripe_num = raw_reader->getNumberOfStripes(); + auto stripe_stats_num = raw_reader->getNumberOfStripeStatistics(); + if (stripe_num != stripe_stats_num) + throw Exception( + fmt::format("orc file:{} has different strip num {} and strip statistics num {}", path, stripe_num, stripe_stats_num), + ErrorCodes::BAD_ARGUMENTS); + + sub_minmax_idxes.resize(stripe_num); + for (size_t i = 0; i < stripe_num; ++i) + { + auto stripe_stats = raw_reader->getStripeStatistics(i); + sub_minmax_idxes[i] = buildMinMaxIndex(stripe_stats.get()); + } } bool HiveParquetFile::hasSubMinMaxIndex() const { - // TODO To be implemented - return false; + return !storage_settings->disable_parquet_rowgroup_minmax_index; } void HiveParquetFile::prepareReader() { - // TODO To be implemented - throw Exception("Unimplemented HiveParquetFile::prepareReader", ErrorCodes::NOT_IMPLEMENTED); + in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); + auto format_settings = getFormatSettings(getContext()); + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &reader)); } - void HiveParquetFile::loadSubMinMaxIndex() { - // TODO To be implemented - throw Exception("Unimplemented HiveParquetFile::loadSubMinMaxIndex", ErrorCodes::NOT_IMPLEMENTED); + if (!reader) + prepareReader(); + + auto meta = reader->parquet_reader()->metadata(); + size_t num_cols = meta->num_columns(); + size_t num_row_groups = meta->num_row_groups(); + const auto * schema = meta->schema(); + for (size_t pos = 0; pos < num_cols; ++pos) + { + String column{schema->Column(pos)->name()}; + boost::to_lower(column); + parquet_column_positions[column] = pos; + } + + + sub_minmax_idxes.resize(num_row_groups); + for (size_t i = 0; i < num_row_groups; ++i) + { + auto row_group_meta = meta->RowGroup(i); + sub_minmax_idxes[i] = std::make_shared(); + sub_minmax_idxes[i]->hyperrectangle.resize(num_cols); + + size_t j = 0; + auto it = index_names_and_types.begin(); + for (; it != index_names_and_types.end(); ++j, ++it) + { + // 如果parquet file中不存在该字段,使用空Range + String name{it->name}; + boost::to_lower(name); + auto mit = parquet_column_positions.find(name); + if (mit == parquet_column_positions.end()) + continue; + + size_t pos = mit->second; + auto col_chunk = row_group_meta->ColumnChunk(pos); + if (!col_chunk->is_stats_set()) + continue; + + auto stats = col_chunk->statistics(); + if (stats->HasNullCount() && stats->null_count() > 0) + continue; + + if (auto bool_stats = std::dynamic_pointer_cast(stats)) + { + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(bool_stats); + } + else if (auto int32_stats = std::dynamic_pointer_cast(stats)) + { + // Hive中没有unsigned interger, 这里不用考虑相关case + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(int32_stats); + } + else if (auto int64_stats = std::dynamic_pointer_cast(stats)) + { + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(int64_stats); + } + else if (auto float_stats = std::dynamic_pointer_cast(stats)) + { + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(float_stats); + } + else if (auto double_stats = std::dynamic_pointer_cast(stats)) + { + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(double_stats); + } + else if (auto string_stats = std::dynamic_pointer_cast(stats)) + { + sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(string_stats); + } + // 其他类型无法使用minmax index, 跳过 + } + sub_minmax_idxes[i]->initialized = true; + } } } diff --git a/src/Storages/Hive/HiveFile.h b/src/Storages/Hive/HiveFile.h index 63cca2562eb..7a3fcac312f 100644 --- a/src/Storages/Hive/HiveFile.h +++ b/src/Storages/Hive/HiveFile.h @@ -17,26 +17,18 @@ namespace orc { -class Reader; +class Statistics; +class ColumnStatistics; } -namespace parquet +namespace parquet::arrow { -class ParquetFileReader; -namespace arrow -{ - class FileReader; -} +class FileReader; } -namespace arrow +namespace arrow::adapters::orc { -namespace io -{ - class RandomAccessFile; -} - -class Buffer; +class ORCFileReader; } namespace DB @@ -46,6 +38,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } +class ReadBufferFromHDFS; class IHiveFile : public WithContext { public: @@ -230,7 +223,8 @@ protected: virtual void prepareReader(); virtual void prepareColumnMapping(); - std::shared_ptr reader; + std::unique_ptr in; + std::unique_ptr reader; std::map orc_column_positions; }; @@ -259,8 +253,8 @@ public: protected: virtual void prepareReader(); - std::shared_ptr fs; - std::shared_ptr reader; + std::unique_ptr in; + std::unique_ptr reader; std::map parquet_column_positions; }; } diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 3040ad23283..ed9da822fb0 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -2,12 +2,14 @@ #if USE_HIVE -#include #include #include +#include #include - #include +#include +#include + #include #include #include