#include #if USE_HIVE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; } #define THROW_ARROW_NOT_OK(status) \ do \ { \ if (const ::arrow::Status & _s = (status); !_s.ok()) \ throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \ } while (false) template Range createRangeFromOrcStatistics(const StatisticsType * stats) { /// Null values or NaN/Inf values of double type. if (stats->hasMinimum() && stats->hasMaximum()) { return Range(FieldType(stats->getMinimum()), true, FieldType(stats->getMaximum()), true); } else if (stats->hasMinimum()) { return Range::createLeftBounded(FieldType(stats->getMinimum()), true); } else if (stats->hasMaximum()) { return Range::createRightBounded(FieldType(stats->getMaximum()), true); } else { return Range(); } } template Range createRangeFromParquetStatistics(std::shared_ptr stats) { /// We must check if there are minimum or maximum values in statistics in case of /// null values or NaN/Inf values of double type. if (!stats->HasMinMax()) return Range(); return Range(FieldType(stats->min()), true, FieldType(stats->max()), true); } Range createRangeFromParquetStatistics(std::shared_ptr stats) { if (!stats->HasMinMax()) return Range(); String min_val(reinterpret_cast(stats->min().ptr), stats->min().len); String max_val(reinterpret_cast(stats->max().ptr), stats->max().len); return Range(min_val, true, max_val, true); } std::optional IHiveFile::getRows() { if (!rows) rows = getRowsImpl(); return rows; } Range HiveORCFile::buildRange(const orc::ColumnStatistics * col_stats) { if (!col_stats || col_stats->hasNull()) return {}; if (const auto * int_stats = dynamic_cast(col_stats)) { return createRangeFromOrcStatistics(int_stats); } else if (const auto * double_stats = dynamic_cast(col_stats)) { return createRangeFromOrcStatistics(double_stats); } else if (const auto * string_stats = dynamic_cast(col_stats)) { return createRangeFromOrcStatistics(string_stats); } else if (const auto * bool_stats = dynamic_cast(col_stats)) { auto false_cnt = bool_stats->getFalseCount(); auto true_cnt = bool_stats->getTrueCount(); if (false_cnt && true_cnt) { return Range(UInt8(0), true, UInt8(1), true); } else if (false_cnt) { return Range::createLeftBounded(UInt8(0), true); } else if (true_cnt) { return Range::createRightBounded(UInt8(1), true); } } else if (const auto * timestamp_stats = dynamic_cast(col_stats)) { return createRangeFromOrcStatistics(timestamp_stats); } else if (const auto * date_stats = dynamic_cast(col_stats)) { return createRangeFromOrcStatistics(date_stats); } return {}; } void HiveORCFile::prepareReader() { in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); auto format_settings = getFormatSettings(getContext()); std::atomic is_stopped{0}; auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool()); THROW_ARROW_NOT_OK(result.status()); reader = std::move(result).ValueOrDie(); } void HiveORCFile::prepareColumnMapping() { const orc::Type & type = reader->GetRawORCReader()->getType(); size_t count = type.getSubtypeCount(); for (size_t pos = 0; pos < count; pos++) { /// Column names in hive is case-insensitive. String column{type.getFieldName(pos)}; boost::to_lower(column); orc_column_positions[column] = pos; } } bool HiveORCFile::hasMinMaxIndex() const { return storage_settings->enable_orc_file_minmax_index; } std::unique_ptr HiveORCFile::buildMinMaxIndex(const orc::Statistics * statistics) { if (!statistics) return nullptr; size_t range_num = index_names_and_types.size(); auto idx = std::make_unique(); idx->hyperrectangle.resize(range_num); size_t i = 0; for (const auto & name_type : index_names_and_types) { String column{name_type.name}; boost::to_lower(column); auto it = orc_column_positions.find(column); if (it == orc_column_positions.end()) { idx->hyperrectangle[i] = buildRange(nullptr); } else { size_t pos = it->second; /// Attention: column statistics start from 1. 0 has special purpose. const orc::ColumnStatistics * col_stats = statistics->getColumnStatistics(pos + 1); idx->hyperrectangle[i] = buildRange(col_stats); } ++i; } idx->initialized = true; return idx; } void HiveORCFile::loadMinMaxIndex() { if (!reader) { prepareReader(); prepareColumnMapping(); } auto statistics = reader->GetRawORCReader()->getStatistics(); minmax_idx = buildMinMaxIndex(statistics.get()); } bool HiveORCFile::hasSubMinMaxIndex() const { return storage_settings->enable_orc_stripe_minmax_index; } void HiveORCFile::loadSubMinMaxIndex() { if (!reader) { prepareReader(); prepareColumnMapping(); } auto * raw_reader = reader->GetRawORCReader(); auto stripe_num = raw_reader->getNumberOfStripes(); auto stripe_stats_num = raw_reader->getNumberOfStripeStatistics(); if (stripe_num != stripe_stats_num) throw Exception( fmt::format("orc file:{} has different strip num {} and strip statistics num {}", path, stripe_num, stripe_stats_num), ErrorCodes::BAD_ARGUMENTS); sub_minmax_idxes.resize(stripe_num); for (size_t i = 0; i < stripe_num; ++i) { auto stripe_stats = raw_reader->getStripeStatistics(i); sub_minmax_idxes[i] = buildMinMaxIndex(stripe_stats.get()); } } std::optional HiveORCFile::getRowsImpl() { if (!reader) { prepareReader(); prepareColumnMapping(); } auto * raw_reader = reader->GetRawORCReader(); return raw_reader->getNumberOfRows(); } bool HiveParquetFile::hasSubMinMaxIndex() const { return storage_settings->enable_parquet_rowgroup_minmax_index; } void HiveParquetFile::prepareReader() { in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); auto format_settings = getFormatSettings(getContext()); std::atomic is_stopped{0}; THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader)); } void HiveParquetFile::loadSubMinMaxIndex() { if (!reader) prepareReader(); auto meta = reader->parquet_reader()->metadata(); size_t num_cols = meta->num_columns(); size_t num_row_groups = meta->num_row_groups(); const auto * schema = meta->schema(); for (size_t pos = 0; pos < num_cols; ++pos) { String column{schema->Column(pos)->name()}; boost::to_lower(column); parquet_column_positions[column] = pos; } sub_minmax_idxes.resize(num_row_groups); for (size_t i = 0; i < num_row_groups; ++i) { auto row_group_meta = meta->RowGroup(i); sub_minmax_idxes[i] = std::make_shared(); sub_minmax_idxes[i]->hyperrectangle.resize(num_cols); size_t j = 0; auto it = index_names_and_types.begin(); for (; it != index_names_and_types.end(); ++j, ++it) { String column{it->name}; boost::to_lower(column); auto mit = parquet_column_positions.find(column); if (mit == parquet_column_positions.end()) continue; size_t pos = mit->second; auto col_chunk = row_group_meta->ColumnChunk(pos); if (!col_chunk->is_stats_set()) continue; auto stats = col_chunk->statistics(); if (stats->HasNullCount() && stats->null_count() > 0) continue; if (auto bool_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(bool_stats); } else if (auto int32_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(int32_stats); } else if (auto int64_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(int64_stats); } else if (auto float_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(float_stats); } else if (auto double_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(double_stats); } else if (auto string_stats = std::dynamic_pointer_cast(stats)) { sub_minmax_idxes[i]->hyperrectangle[j] = createRangeFromParquetStatistics(string_stats); } /// Other types are not supported for minmax index, skip } sub_minmax_idxes[i]->initialized = true; } } std::optional HiveParquetFile::getRowsImpl() { if (!reader) prepareReader(); auto meta = reader->parquet_reader()->metadata(); return meta->num_rows(); } } #endif