From 38f149b533607d13d406c4efac4d8398ad4a1eb3 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 4 Apr 2022 15:28:26 +0800 Subject: [PATCH] optimize trivial count hive query --- src/Storages/Hive/HiveFile.cpp | 44 ++++- src/Storages/Hive/HiveFile.h | 23 ++- src/Storages/Hive/StorageHive.cpp | 260 +++++++++++++++++++----------- src/Storages/Hive/StorageHive.h | 51 ++++-- 4 files changed, 266 insertions(+), 112 deletions(-) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 407d9602b61..12d60c4d1b3 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -77,7 +77,14 @@ Range createRangeFromParquetStatistics(std::shared_ptr IHiveFile::getRows() +{ + if (!rows) + rows = getRowsImpl(); + return rows; +} + +Range HiveORCFile::buildRange(const orc::ColumnStatistics * col_stats) { if (!col_stats || col_stats->hasNull()) return {}; @@ -122,7 +129,7 @@ Range HiveOrcFile::buildRange(const orc::ColumnStatistics * col_stats) return {}; } -void HiveOrcFile::prepareReader() +void HiveORCFile::prepareReader() { in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); auto format_settings = getFormatSettings(getContext()); @@ -132,7 +139,7 @@ void HiveOrcFile::prepareReader() reader = std::move(result).ValueOrDie(); } -void HiveOrcFile::prepareColumnMapping() +void HiveORCFile::prepareColumnMapping() { const orc::Type & type = reader->GetRawORCReader()->getType(); size_t count = type.getSubtypeCount(); @@ -145,13 +152,13 @@ void HiveOrcFile::prepareColumnMapping() } } -bool HiveOrcFile::hasMinMaxIndex() const +bool HiveORCFile::hasMinMaxIndex() const { return storage_settings->enable_orc_file_minmax_index; } -std::unique_ptr HiveOrcFile::buildMinMaxIndex(const orc::Statistics * statistics) +std::unique_ptr HiveORCFile::buildMinMaxIndex(const orc::Statistics * statistics) { if (!statistics) return nullptr; @@ -184,7 +191,7 @@ std::unique_ptr HiveOrcFile::buildMinMaxIndex(c } -void HiveOrcFile::loadMinMaxIndex() +void HiveORCFile::loadMinMaxIndex() { if (!reader) { @@ -196,13 +203,13 @@ void HiveOrcFile::loadMinMaxIndex() minmax_idx = buildMinMaxIndex(statistics.get()); } -bool HiveOrcFile::hasSubMinMaxIndex() const +bool HiveORCFile::hasSubMinMaxIndex() const { return storage_settings->enable_orc_stripe_minmax_index; } -void HiveOrcFile::loadSubMinMaxIndex() +void HiveORCFile::loadSubMinMaxIndex() { if (!reader) { @@ -226,6 +233,18 @@ void HiveOrcFile::loadSubMinMaxIndex() } } +std::optional HiveORCFile::getRowsImpl() +{ + if (!reader) + { + prepareReader(); + prepareColumnMapping(); + } + + auto * raw_reader = reader->GetRawORCReader(); + return raw_reader->getNumberOfRows(); +} + bool HiveParquetFile::hasSubMinMaxIndex() const { return storage_settings->enable_parquet_rowgroup_minmax_index; @@ -312,5 +331,14 @@ void HiveParquetFile::loadSubMinMaxIndex() } } +std::optional HiveParquetFile::getRowsImpl() +{ + if (!reader) + prepareReader(); + + auto meta = reader->parquet_reader()->metadata(); + return meta->num_rows(); +} + } #endif diff --git a/src/Storages/Hive/HiveFile.h b/src/Storages/Hive/HiveFile.h index aef9d72755a..21dddc1b68f 100644 --- a/src/Storages/Hive/HiveFile.h +++ b/src/Storages/Hive/HiveFile.h @@ -102,6 +102,12 @@ public: virtual String getPath() const { return path; } + virtual UInt64 getLastModTs() const { return last_modify_time; } + + virtual size_t getSize() const { return size; } + + std::optional getRows(); + virtual FieldVector getPartitionValues() const { return partition_values; } virtual String getNamenodeUrl() { return namenode_url; } @@ -144,15 +150,16 @@ public: return boost::algorithm::join(strs, "|"); } - inline UInt64 getLastModTs() const { return last_modify_time; } - inline size_t getSize() const { return size; } - protected: + virtual std::optional getRowsImpl() = 0; + FieldVector partition_values; String namenode_url; String path; UInt64 last_modify_time; size_t size; + std::optional rows; + NamesAndTypesList index_names_and_types; MinMaxIndexPtr minmax_idx; std::vector sub_minmax_idxes; @@ -182,12 +189,15 @@ public: virtual FileFormat getFormat() const override { return FileFormat::TEXT; } virtual String getName() const override { return "TEXT"; } + +protected: + std::optional getRowsImpl() override { return {}; } }; -class HiveOrcFile : public IHiveFile +class HiveORCFile : public IHiveFile { public: - HiveOrcFile( + HiveORCFile( const FieldVector & values_, const String & namenode_url_, const String & path_, @@ -214,6 +224,8 @@ protected: virtual void prepareReader(); virtual void prepareColumnMapping(); + std::optional getRowsImpl() override; + std::unique_ptr in; std::unique_ptr reader; std::map orc_column_positions; @@ -243,6 +255,7 @@ public: protected: virtual void prepareReader(); + std::optional getRowsImpl() override; std::unique_ptr in; std::unique_ptr reader; diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 97d735d4fc1..4d2a01f2c94 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -44,6 +44,7 @@ namespace ErrorCodes extern const int INVALID_PARTITION_VALUE; extern const int BAD_ARGUMENTS; extern const int CANNOT_OPEN_FILE; + extern const int LOGICAL_ERROR; } @@ -419,7 +420,7 @@ HiveFilePtr createHiveFile( } else if (format_name == "ORC") { - hive_file = std::make_shared(fields, namenode_url, path, ts, size, index_names_and_types, hive_settings, context); + hive_file = std::make_shared(fields, namenode_url, path, ts, size, index_names_and_types, hive_settings, context); } else if (format_name == "Parquet") { @@ -432,24 +433,26 @@ HiveFilePtr createHiveFile( return hive_file; } -std::vector StorageHive::collectHiveFilesFromPartition( +HiveFiles StorageHive::collectHiveFilesFromPartition( const Apache::Hadoop::Hive::Partition & partition, - SelectQueryInfo & query_info, + const SelectQueryInfo & query_info, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs, - ContextPtr context_) + ContextPtr context_, + PruneLevel prune_level) const { - LOG_DEBUG(log, "Collect hive files from partition {}", boost::join(partition.values, ",")); + LOG_DEBUG( + log, "Collect hive files from partition {}, prune_level:{}", boost::join(partition.values, ","), pruneLevelToString(prune_level)); - /// Skip partition "__HIVE_DEFAULT_PARTITION__" - bool has_default_partition = false; - for (const auto & value : partition.values) - { - if (value == "__HIVE_DEFAULT_PARTITION__") - { - has_default_partition = true; - break; - } + /// Skip partition "__HIVE_DEFAULT_PARTITION__" + bool has_default_partition = false; + for (const auto & value : partition.values) + { + if (value == "__HIVE_DEFAULT_PARTITION__") + { + has_default_partition = true; + break; + } } if (has_default_partition) return {}; @@ -479,25 +482,29 @@ std::vector StorageHive::collectHiveFilesFromPartition( if (!reader->pull(block) || !block.rows()) throw Exception("Could not parse partition value: " + wb.str(), ErrorCodes::INVALID_PARTITION_VALUE); - std::vector ranges; - ranges.reserve(partition_names.size()); + /// Get partition values FieldVector fields(partition_names.size()); for (size_t i = 0; i < partition_names.size(); ++i) - { block.getByPosition(i).column->get(0, fields[i]); - ranges.emplace_back(fields[i]); + + if (prune_level >= PruneLevel::Partition) + { + std::vector ranges; + ranges.reserve(partition_names.size()); + for (size_t i = 0; i < partition_names.size(); ++i) + ranges.emplace_back(fields[i]); + + const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr); + if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true) + return {}; } - const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr); - if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true) - return {}; - + HiveFiles hive_files; auto file_infos = listDirectory(partition.sd.location, hive_table_metadata, fs); - std::vector hive_files; hive_files.reserve(file_infos.size()); for (const auto & file_info : file_infos) { - auto hive_file = createHiveFileIfNeeded(file_info, fields, query_info, context_); + auto hive_file = createHiveFileIfNeeded(file_info, fields, query_info, context_, prune_level); if (hive_file) hive_files.push_back(hive_file); } @@ -511,12 +518,17 @@ StorageHive::listDirectory(const String & path, HiveTableMetadataPtr hive_table_ } HiveFilePtr StorageHive::createHiveFileIfNeeded( - const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_) + const FileInfo & file_info, + const FieldVector & fields, + const SelectQueryInfo & query_info, + ContextPtr context_, + PruneLevel prune_level) const { - LOG_TRACE(log, "Append hive file {}", file_info.path); + LOG_TRACE(log, "create hive file {} if needed, prune_level:{}", file_info.path, pruneLevelToString(prune_level)); + String filename = getBaseName(file_info.path); /// Skip temporary files starts with '.' - if (filename.find('.') == 0) + if (startsWith(filename, ".") == 0) return {}; auto hive_file = createHiveFile( @@ -531,34 +543,44 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded( context_); /// Load file level minmax index and apply - const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr); - if (hive_file->hasMinMaxIndex()) + if (prune_level >= PruneLevel::File) { - hive_file->loadMinMaxIndex(); - if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes()) - .can_be_true) + const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr); + if (hive_file->hasMinMaxIndex()) { - LOG_TRACE(log, "Skip hive file {} by index {}", hive_file->getPath(), hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex())); - return {}; - } - } - - /// Load sub-file level minmax index and apply - if (hive_file->hasSubMinMaxIndex()) - { - std::set skip_splits; - hive_file->loadSubMinMaxIndex(); - const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes(); - for (size_t i = 0; i < sub_minmax_idxes.size(); ++i) - { - if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes()) + hive_file->loadMinMaxIndex(); + if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes()) .can_be_true) { - LOG_TRACE(log, "Skip split {} of hive file {}", i, hive_file->getPath()); - skip_splits.insert(i); + LOG_TRACE( + log, + "Skip hive file {} by index {}", + hive_file->getPath(), + hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex())); + return {}; + } + } + + if (prune_level >= PruneLevel::Split) + { + /// Load sub-file level minmax index and apply + if (hive_file->hasSubMinMaxIndex()) + { + std::set skip_splits; + hive_file->loadSubMinMaxIndex(); + const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes(); + for (size_t i = 0; i < sub_minmax_idxes.size(); ++i) + { + if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes()) + .can_be_true) + { + LOG_TRACE(log, "Skip split {} of hive file {}", i, hive_file->getPath()); + skip_splits.insert(i); + } + } + hive_file->setSkipSplits(skip_splits); } } - hive_file->setSkipSplits(skip_splits); } return hive_file; } @@ -591,6 +613,7 @@ void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & hea } } } + Pipe StorageHive::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, @@ -607,47 +630,9 @@ Pipe StorageHive::read( auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext()); auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table); - std::vector partitions = hive_table_metadata->getPartitions(); - /// Hive files to read - HiveFiles hive_files; - /// Mutext to protect hive_files, which maybe appended in multiple threads - std::mutex hive_files_mutex; - - ThreadPool pool{num_streams}; - if (!partitions.empty()) - { - for (const auto & partition : partitions) - { - pool.scheduleOrThrowOnError([&]() - { - auto hive_files_in_partition = collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_); - if (!hive_files_in_partition.empty()) - { - std::lock_guard lock(hive_files_mutex); - hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition)); - } - }); - } - pool.wait(); - } - else if (partition_name_types.empty()) /// Partition keys is empty - { - auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs); - for (const auto & file_info : file_infos) - { - pool.scheduleOrThrowOnError([&] - { - auto hive_file = createHiveFileIfNeeded(file_info, {}, query_info, context_); - if (hive_file) - { - std::lock_guard lock(hive_files_mutex); - hive_files.push_back(hive_file); - } - }); - } - pool.wait(); - } - else /// Partition keys is not empty but partitions is empty + /// Collect Hive files to read + HiveFiles hive_files = collectHiveFiles(num_streams, query_info, hive_table_metadata, fs, context_); + if (hive_files.empty()) return {}; auto sources_info = std::make_shared(); @@ -689,6 +674,63 @@ Pipe StorageHive::read( return Pipe::unitePipes(std::move(pipes)); } +HiveFiles StorageHive::collectHiveFiles( + unsigned max_threads, + const SelectQueryInfo & query_info, + HiveTableMetadataPtr hive_table_metadata, + const HDFSFSPtr & fs, + ContextPtr context_, + PruneLevel prune_level) const +{ + + std::vector partitions = hive_table_metadata->getPartitions(); + /// Hive table have no partition + if (!partition_name_types.empty() && partitions.empty()) + return {}; + + /// Hive files to collect + HiveFiles hive_files; + /// Mutext to protect hive_files, which maybe appended in multiple threads + std::mutex hive_files_mutex; + ThreadPool pool{max_threads}; + if (!partitions.empty()) + { + for (const auto & partition : partitions) + { + pool.scheduleOrThrowOnError( + [&]() + { + auto hive_files_in_partition + = collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_, prune_level); + if (!hive_files_in_partition.empty()) + { + std::lock_guard lock(hive_files_mutex); + hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition)); + } + }); + } + } + else /// Partition keys is empty but still have files + { + auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs); + for (const auto & file_info : file_infos) + { + pool.scheduleOrThrowOnError( + [&]() + { + auto hive_file = createHiveFileIfNeeded(file_info, {}, query_info, context_, prune_level); + if (hive_file) + { + std::lock_guard lock(hive_files_mutex); + hive_files.push_back(hive_file); + } + }); + } + } + pool.wait(); + return hive_files; +} + SinkToStoragePtr StorageHive::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /* metadata_snapshot*/, ContextPtr /*context*/) { throw Exception("Method write is not implemented for StorageHive", ErrorCodes::NOT_IMPLEMENTED); @@ -701,6 +743,44 @@ NamesAndTypesList StorageHive::getVirtuals() const {"_file", std::make_shared(std::make_shared())}}; } +std::optional StorageHive::totalRows(const Settings & settings) const +{ + /// query_info is not used when prune_level == PruneLevel::None + SelectQueryInfo query_info; + return totalRowsImpl(settings, query_info, getContext(), PruneLevel::None); +} + +std::optional StorageHive::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const +{ + return totalRowsImpl(context_->getSettingsRef(), query_info, context_, PruneLevel::Partition); +} + +std::optional +StorageHive::totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const +{ + /// Row-based format like Text doesn't support totalRowsByPartitionPredicate + if (!isColumnOriented()) + return {}; + + auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext()); + auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table); + HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, getContext()->getGlobalContext()->getConfigRef()); + HDFSFSPtr fs = createHDFSFS(builder.get()); + HiveFiles hive_files = collectHiveFiles(settings.max_threads, query_info, hive_table_metadata, fs, context_, prune_level); + + UInt64 total_rows = 0; + for (const auto & hive_file : hive_files) + { + auto file_rows = hive_file->getRows(); + if (!file_rows) + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Rows of hive file:{} with format:{} not initialized", hive_file->getPath(), format_name); + total_rows += *file_rows; + } + return total_rows; +} + + void registerStorageHive(StorageFactory & factory) { factory.registerStorage( diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index bdf37cc9f04..1470b883585 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -26,7 +26,6 @@ class HiveSettings; class StorageHive final : public shared_ptr_helper, public IStorage, WithContext { friend struct shared_ptr_helper; - public: String getName() const override { return "Hive"; } @@ -54,6 +53,9 @@ public: bool isColumnOriented() const override; + std::optional totalRows(const Settings & settings) const override; + std::optional totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override; + protected: friend class StorageHiveSource; StorageHive( @@ -73,24 +75,56 @@ private: using FileInfo = HiveMetastoreClient::FileInfo; using HiveTableMetadataPtr = HiveMetastoreClient::HiveTableMetadataPtr; + enum class PruneLevel + { + None, /// Do not prune + Partition, + File, + Split, + Max = Split, + }; + + static String pruneLevelToString(PruneLevel level) + { + return String(magic_enum::enum_name(level)); + } + static ASTPtr extractKeyExpressionList(const ASTPtr & node); static std::vector listDirectory(const String & path, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs); void initMinMaxIndexExpression(); - std::vector collectHiveFilesFromPartition( - const Apache::Hadoop::Hive::Partition & partition, - SelectQueryInfo & query_info, + HiveFiles collectHiveFiles( + unsigned max_threads, + const SelectQueryInfo & query_info, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs, - ContextPtr context_); + ContextPtr context_, + PruneLevel prune_level = PruneLevel::Max) const; - HiveFilePtr - createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); + HiveFiles collectHiveFilesFromPartition( + const Apache::Hadoop::Hive::Partition & partition, + const SelectQueryInfo & query_info, + HiveTableMetadataPtr hive_table_metadata, + const HDFSFSPtr & fs, + ContextPtr context_, + PruneLevel prune_level = PruneLevel::Max) const; + + HiveFilePtr createHiveFileIfNeeded( + const FileInfo & file_info, + const FieldVector & fields, + const SelectQueryInfo & query_info, + ContextPtr context_, + PruneLevel prune_level = PruneLevel::Max) const; void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const; + void lazyInitialize(); + + std::optional + totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const; + String hive_metastore_url; /// Hive database and table @@ -122,9 +156,8 @@ private: std::shared_ptr storage_settings; Poco::Logger * log = &Poco::Logger::get("StorageHive"); - - void lazyInitialize(); }; + } #endif