mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
optimize trivial count hive query
This commit is contained in:
parent
4e2d5f1841
commit
38f149b533
@ -77,7 +77,14 @@ Range createRangeFromParquetStatistics(std::shared_ptr<parquet::ByteArrayStatist
|
||||
return Range(min_val, true, max_val, true);
|
||||
}
|
||||
|
||||
Range HiveOrcFile::buildRange(const orc::ColumnStatistics * col_stats)
|
||||
std::optional<size_t> IHiveFile::getRows()
|
||||
{
|
||||
if (!rows)
|
||||
rows = getRowsImpl();
|
||||
return rows;
|
||||
}
|
||||
|
||||
Range HiveORCFile::buildRange(const orc::ColumnStatistics * col_stats)
|
||||
{
|
||||
if (!col_stats || col_stats->hasNull())
|
||||
return {};
|
||||
@ -122,7 +129,7 @@ Range HiveOrcFile::buildRange(const orc::ColumnStatistics * col_stats)
|
||||
return {};
|
||||
}
|
||||
|
||||
void HiveOrcFile::prepareReader()
|
||||
void HiveORCFile::prepareReader()
|
||||
{
|
||||
in = std::make_unique<ReadBufferFromHDFS>(namenode_url, path, getContext()->getGlobalContext()->getConfigRef());
|
||||
auto format_settings = getFormatSettings(getContext());
|
||||
@ -132,7 +139,7 @@ void HiveOrcFile::prepareReader()
|
||||
reader = std::move(result).ValueOrDie();
|
||||
}
|
||||
|
||||
void HiveOrcFile::prepareColumnMapping()
|
||||
void HiveORCFile::prepareColumnMapping()
|
||||
{
|
||||
const orc::Type & type = reader->GetRawORCReader()->getType();
|
||||
size_t count = type.getSubtypeCount();
|
||||
@ -145,13 +152,13 @@ void HiveOrcFile::prepareColumnMapping()
|
||||
}
|
||||
}
|
||||
|
||||
bool HiveOrcFile::hasMinMaxIndex() const
|
||||
bool HiveORCFile::hasMinMaxIndex() const
|
||||
{
|
||||
return storage_settings->enable_orc_file_minmax_index;
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveOrcFile::buildMinMaxIndex(const orc::Statistics * statistics)
|
||||
std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveORCFile::buildMinMaxIndex(const orc::Statistics * statistics)
|
||||
{
|
||||
if (!statistics)
|
||||
return nullptr;
|
||||
@ -184,7 +191,7 @@ std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveOrcFile::buildMinMaxIndex(c
|
||||
}
|
||||
|
||||
|
||||
void HiveOrcFile::loadMinMaxIndex()
|
||||
void HiveORCFile::loadMinMaxIndex()
|
||||
{
|
||||
if (!reader)
|
||||
{
|
||||
@ -196,13 +203,13 @@ void HiveOrcFile::loadMinMaxIndex()
|
||||
minmax_idx = buildMinMaxIndex(statistics.get());
|
||||
}
|
||||
|
||||
bool HiveOrcFile::hasSubMinMaxIndex() const
|
||||
bool HiveORCFile::hasSubMinMaxIndex() const
|
||||
{
|
||||
return storage_settings->enable_orc_stripe_minmax_index;
|
||||
}
|
||||
|
||||
|
||||
void HiveOrcFile::loadSubMinMaxIndex()
|
||||
void HiveORCFile::loadSubMinMaxIndex()
|
||||
{
|
||||
if (!reader)
|
||||
{
|
||||
@ -226,6 +233,18 @@ void HiveOrcFile::loadSubMinMaxIndex()
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<size_t> HiveORCFile::getRowsImpl()
|
||||
{
|
||||
if (!reader)
|
||||
{
|
||||
prepareReader();
|
||||
prepareColumnMapping();
|
||||
}
|
||||
|
||||
auto * raw_reader = reader->GetRawORCReader();
|
||||
return raw_reader->getNumberOfRows();
|
||||
}
|
||||
|
||||
bool HiveParquetFile::hasSubMinMaxIndex() const
|
||||
{
|
||||
return storage_settings->enable_parquet_rowgroup_minmax_index;
|
||||
@ -312,5 +331,14 @@ void HiveParquetFile::loadSubMinMaxIndex()
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<size_t> HiveParquetFile::getRowsImpl()
|
||||
{
|
||||
if (!reader)
|
||||
prepareReader();
|
||||
|
||||
auto meta = reader->parquet_reader()->metadata();
|
||||
return meta->num_rows();
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
||||
|
@ -102,6 +102,12 @@ public:
|
||||
|
||||
virtual String getPath() const { return path; }
|
||||
|
||||
virtual UInt64 getLastModTs() const { return last_modify_time; }
|
||||
|
||||
virtual size_t getSize() const { return size; }
|
||||
|
||||
std::optional<size_t> getRows();
|
||||
|
||||
virtual FieldVector getPartitionValues() const { return partition_values; }
|
||||
|
||||
virtual String getNamenodeUrl() { return namenode_url; }
|
||||
@ -144,15 +150,16 @@ public:
|
||||
return boost::algorithm::join(strs, "|");
|
||||
}
|
||||
|
||||
inline UInt64 getLastModTs() const { return last_modify_time; }
|
||||
inline size_t getSize() const { return size; }
|
||||
|
||||
protected:
|
||||
virtual std::optional<size_t> getRowsImpl() = 0;
|
||||
|
||||
FieldVector partition_values;
|
||||
String namenode_url;
|
||||
String path;
|
||||
UInt64 last_modify_time;
|
||||
size_t size;
|
||||
std::optional<size_t> rows;
|
||||
|
||||
NamesAndTypesList index_names_and_types;
|
||||
MinMaxIndexPtr minmax_idx;
|
||||
std::vector<MinMaxIndexPtr> sub_minmax_idxes;
|
||||
@ -182,12 +189,15 @@ public:
|
||||
|
||||
virtual FileFormat getFormat() const override { return FileFormat::TEXT; }
|
||||
virtual String getName() const override { return "TEXT"; }
|
||||
|
||||
protected:
|
||||
std::optional<size_t> getRowsImpl() override { return {}; }
|
||||
};
|
||||
|
||||
class HiveOrcFile : public IHiveFile
|
||||
class HiveORCFile : public IHiveFile
|
||||
{
|
||||
public:
|
||||
HiveOrcFile(
|
||||
HiveORCFile(
|
||||
const FieldVector & values_,
|
||||
const String & namenode_url_,
|
||||
const String & path_,
|
||||
@ -214,6 +224,8 @@ protected:
|
||||
virtual void prepareReader();
|
||||
virtual void prepareColumnMapping();
|
||||
|
||||
std::optional<size_t> getRowsImpl() override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromHDFS> in;
|
||||
std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader;
|
||||
std::map<String, size_t> orc_column_positions;
|
||||
@ -243,6 +255,7 @@ public:
|
||||
|
||||
protected:
|
||||
virtual void prepareReader();
|
||||
std::optional<size_t> getRowsImpl() override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromHDFS> in;
|
||||
std::unique_ptr<parquet::arrow::FileReader> reader;
|
||||
|
@ -44,6 +44,7 @@ namespace ErrorCodes
|
||||
extern const int INVALID_PARTITION_VALUE;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -419,7 +420,7 @@ HiveFilePtr createHiveFile(
|
||||
}
|
||||
else if (format_name == "ORC")
|
||||
{
|
||||
hive_file = std::make_shared<HiveOrcFile>(fields, namenode_url, path, ts, size, index_names_and_types, hive_settings, context);
|
||||
hive_file = std::make_shared<HiveORCFile>(fields, namenode_url, path, ts, size, index_names_and_types, hive_settings, context);
|
||||
}
|
||||
else if (format_name == "Parquet")
|
||||
{
|
||||
@ -432,24 +433,26 @@ HiveFilePtr createHiveFile(
|
||||
return hive_file;
|
||||
}
|
||||
|
||||
std::vector<HiveFilePtr> StorageHive::collectHiveFilesFromPartition(
|
||||
HiveFiles StorageHive::collectHiveFilesFromPartition(
|
||||
const Apache::Hadoop::Hive::Partition & partition,
|
||||
SelectQueryInfo & query_info,
|
||||
const SelectQueryInfo & query_info,
|
||||
HiveTableMetadataPtr hive_table_metadata,
|
||||
const HDFSFSPtr & fs,
|
||||
ContextPtr context_)
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level) const
|
||||
{
|
||||
LOG_DEBUG(log, "Collect hive files from partition {}", boost::join(partition.values, ","));
|
||||
LOG_DEBUG(
|
||||
log, "Collect hive files from partition {}, prune_level:{}", boost::join(partition.values, ","), pruneLevelToString(prune_level));
|
||||
|
||||
/// Skip partition "__HIVE_DEFAULT_PARTITION__"
|
||||
bool has_default_partition = false;
|
||||
for (const auto & value : partition.values)
|
||||
{
|
||||
if (value == "__HIVE_DEFAULT_PARTITION__")
|
||||
{
|
||||
has_default_partition = true;
|
||||
break;
|
||||
}
|
||||
/// Skip partition "__HIVE_DEFAULT_PARTITION__"
|
||||
bool has_default_partition = false;
|
||||
for (const auto & value : partition.values)
|
||||
{
|
||||
if (value == "__HIVE_DEFAULT_PARTITION__")
|
||||
{
|
||||
has_default_partition = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (has_default_partition)
|
||||
return {};
|
||||
@ -479,25 +482,29 @@ std::vector<HiveFilePtr> StorageHive::collectHiveFilesFromPartition(
|
||||
if (!reader->pull(block) || !block.rows())
|
||||
throw Exception("Could not parse partition value: " + wb.str(), ErrorCodes::INVALID_PARTITION_VALUE);
|
||||
|
||||
std::vector<Range> ranges;
|
||||
ranges.reserve(partition_names.size());
|
||||
/// Get partition values
|
||||
FieldVector fields(partition_names.size());
|
||||
for (size_t i = 0; i < partition_names.size(); ++i)
|
||||
{
|
||||
block.getByPosition(i).column->get(0, fields[i]);
|
||||
ranges.emplace_back(fields[i]);
|
||||
|
||||
if (prune_level >= PruneLevel::Partition)
|
||||
{
|
||||
std::vector<Range> ranges;
|
||||
ranges.reserve(partition_names.size());
|
||||
for (size_t i = 0; i < partition_names.size(); ++i)
|
||||
ranges.emplace_back(fields[i]);
|
||||
|
||||
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
|
||||
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
|
||||
return {};
|
||||
}
|
||||
|
||||
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
|
||||
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
|
||||
return {};
|
||||
|
||||
HiveFiles hive_files;
|
||||
auto file_infos = listDirectory(partition.sd.location, hive_table_metadata, fs);
|
||||
std::vector<HiveFilePtr> hive_files;
|
||||
hive_files.reserve(file_infos.size());
|
||||
for (const auto & file_info : file_infos)
|
||||
{
|
||||
auto hive_file = createHiveFileIfNeeded(file_info, fields, query_info, context_);
|
||||
auto hive_file = createHiveFileIfNeeded(file_info, fields, query_info, context_, prune_level);
|
||||
if (hive_file)
|
||||
hive_files.push_back(hive_file);
|
||||
}
|
||||
@ -511,12 +518,17 @@ StorageHive::listDirectory(const String & path, HiveTableMetadataPtr hive_table_
|
||||
}
|
||||
|
||||
HiveFilePtr StorageHive::createHiveFileIfNeeded(
|
||||
const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_)
|
||||
const FileInfo & file_info,
|
||||
const FieldVector & fields,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level) const
|
||||
{
|
||||
LOG_TRACE(log, "Append hive file {}", file_info.path);
|
||||
LOG_TRACE(log, "create hive file {} if needed, prune_level:{}", file_info.path, pruneLevelToString(prune_level));
|
||||
|
||||
String filename = getBaseName(file_info.path);
|
||||
/// Skip temporary files starts with '.'
|
||||
if (filename.find('.') == 0)
|
||||
if (startsWith(filename, ".") == 0)
|
||||
return {};
|
||||
|
||||
auto hive_file = createHiveFile(
|
||||
@ -531,34 +543,44 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded(
|
||||
context_);
|
||||
|
||||
/// Load file level minmax index and apply
|
||||
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
|
||||
if (hive_file->hasMinMaxIndex())
|
||||
if (prune_level >= PruneLevel::File)
|
||||
{
|
||||
hive_file->loadMinMaxIndex();
|
||||
if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes())
|
||||
.can_be_true)
|
||||
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
|
||||
if (hive_file->hasMinMaxIndex())
|
||||
{
|
||||
LOG_TRACE(log, "Skip hive file {} by index {}", hive_file->getPath(), hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex()));
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
/// Load sub-file level minmax index and apply
|
||||
if (hive_file->hasSubMinMaxIndex())
|
||||
{
|
||||
std::set<int> skip_splits;
|
||||
hive_file->loadSubMinMaxIndex();
|
||||
const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes();
|
||||
for (size_t i = 0; i < sub_minmax_idxes.size(); ++i)
|
||||
{
|
||||
if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes())
|
||||
hive_file->loadMinMaxIndex();
|
||||
if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes())
|
||||
.can_be_true)
|
||||
{
|
||||
LOG_TRACE(log, "Skip split {} of hive file {}", i, hive_file->getPath());
|
||||
skip_splits.insert(i);
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Skip hive file {} by index {}",
|
||||
hive_file->getPath(),
|
||||
hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex()));
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
if (prune_level >= PruneLevel::Split)
|
||||
{
|
||||
/// Load sub-file level minmax index and apply
|
||||
if (hive_file->hasSubMinMaxIndex())
|
||||
{
|
||||
std::set<int> skip_splits;
|
||||
hive_file->loadSubMinMaxIndex();
|
||||
const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes();
|
||||
for (size_t i = 0; i < sub_minmax_idxes.size(); ++i)
|
||||
{
|
||||
if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes())
|
||||
.can_be_true)
|
||||
{
|
||||
LOG_TRACE(log, "Skip split {} of hive file {}", i, hive_file->getPath());
|
||||
skip_splits.insert(i);
|
||||
}
|
||||
}
|
||||
hive_file->setSkipSplits(skip_splits);
|
||||
}
|
||||
}
|
||||
hive_file->setSkipSplits(skip_splits);
|
||||
}
|
||||
return hive_file;
|
||||
}
|
||||
@ -591,6 +613,7 @@ void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & hea
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Pipe StorageHive::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
@ -607,47 +630,9 @@ Pipe StorageHive::read(
|
||||
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
|
||||
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
|
||||
|
||||
std::vector<Apache::Hadoop::Hive::Partition> partitions = hive_table_metadata->getPartitions();
|
||||
/// Hive files to read
|
||||
HiveFiles hive_files;
|
||||
/// Mutext to protect hive_files, which maybe appended in multiple threads
|
||||
std::mutex hive_files_mutex;
|
||||
|
||||
ThreadPool pool{num_streams};
|
||||
if (!partitions.empty())
|
||||
{
|
||||
for (const auto & partition : partitions)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
auto hive_files_in_partition = collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_);
|
||||
if (!hive_files_in_partition.empty())
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||
hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition));
|
||||
}
|
||||
});
|
||||
}
|
||||
pool.wait();
|
||||
}
|
||||
else if (partition_name_types.empty()) /// Partition keys is empty
|
||||
{
|
||||
auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs);
|
||||
for (const auto & file_info : file_infos)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]
|
||||
{
|
||||
auto hive_file = createHiveFileIfNeeded(file_info, {}, query_info, context_);
|
||||
if (hive_file)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||
hive_files.push_back(hive_file);
|
||||
}
|
||||
});
|
||||
}
|
||||
pool.wait();
|
||||
}
|
||||
else /// Partition keys is not empty but partitions is empty
|
||||
/// Collect Hive files to read
|
||||
HiveFiles hive_files = collectHiveFiles(num_streams, query_info, hive_table_metadata, fs, context_);
|
||||
if (hive_files.empty())
|
||||
return {};
|
||||
|
||||
auto sources_info = std::make_shared<StorageHiveSource::SourcesInfo>();
|
||||
@ -689,6 +674,63 @@ Pipe StorageHive::read(
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
|
||||
HiveFiles StorageHive::collectHiveFiles(
|
||||
unsigned max_threads,
|
||||
const SelectQueryInfo & query_info,
|
||||
HiveTableMetadataPtr hive_table_metadata,
|
||||
const HDFSFSPtr & fs,
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level) const
|
||||
{
|
||||
|
||||
std::vector<Apache::Hadoop::Hive::Partition> partitions = hive_table_metadata->getPartitions();
|
||||
/// Hive table have no partition
|
||||
if (!partition_name_types.empty() && partitions.empty())
|
||||
return {};
|
||||
|
||||
/// Hive files to collect
|
||||
HiveFiles hive_files;
|
||||
/// Mutext to protect hive_files, which maybe appended in multiple threads
|
||||
std::mutex hive_files_mutex;
|
||||
ThreadPool pool{max_threads};
|
||||
if (!partitions.empty())
|
||||
{
|
||||
for (const auto & partition : partitions)
|
||||
{
|
||||
pool.scheduleOrThrowOnError(
|
||||
[&]()
|
||||
{
|
||||
auto hive_files_in_partition
|
||||
= collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_, prune_level);
|
||||
if (!hive_files_in_partition.empty())
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||
hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else /// Partition keys is empty but still have files
|
||||
{
|
||||
auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs);
|
||||
for (const auto & file_info : file_infos)
|
||||
{
|
||||
pool.scheduleOrThrowOnError(
|
||||
[&]()
|
||||
{
|
||||
auto hive_file = createHiveFileIfNeeded(file_info, {}, query_info, context_, prune_level);
|
||||
if (hive_file)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||
hive_files.push_back(hive_file);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
pool.wait();
|
||||
return hive_files;
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageHive::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /* metadata_snapshot*/, ContextPtr /*context*/)
|
||||
{
|
||||
throw Exception("Method write is not implemented for StorageHive", ErrorCodes::NOT_IMPLEMENTED);
|
||||
@ -701,6 +743,44 @@ NamesAndTypesList StorageHive::getVirtuals() const
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageHive::totalRows(const Settings & settings) const
|
||||
{
|
||||
/// query_info is not used when prune_level == PruneLevel::None
|
||||
SelectQueryInfo query_info;
|
||||
return totalRowsImpl(settings, query_info, getContext(), PruneLevel::None);
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageHive::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const
|
||||
{
|
||||
return totalRowsImpl(context_->getSettingsRef(), query_info, context_, PruneLevel::Partition);
|
||||
}
|
||||
|
||||
std::optional<UInt64>
|
||||
StorageHive::totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const
|
||||
{
|
||||
/// Row-based format like Text doesn't support totalRowsByPartitionPredicate
|
||||
if (!isColumnOriented())
|
||||
return {};
|
||||
|
||||
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
|
||||
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
|
||||
HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, getContext()->getGlobalContext()->getConfigRef());
|
||||
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||
HiveFiles hive_files = collectHiveFiles(settings.max_threads, query_info, hive_table_metadata, fs, context_, prune_level);
|
||||
|
||||
UInt64 total_rows = 0;
|
||||
for (const auto & hive_file : hive_files)
|
||||
{
|
||||
auto file_rows = hive_file->getRows();
|
||||
if (!file_rows)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Rows of hive file:{} with format:{} not initialized", hive_file->getPath(), format_name);
|
||||
total_rows += *file_rows;
|
||||
}
|
||||
return total_rows;
|
||||
}
|
||||
|
||||
|
||||
void registerStorageHive(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
|
@ -26,7 +26,6 @@ class HiveSettings;
|
||||
class StorageHive final : public shared_ptr_helper<StorageHive>, public IStorage, WithContext
|
||||
{
|
||||
friend struct shared_ptr_helper<StorageHive>;
|
||||
|
||||
public:
|
||||
String getName() const override { return "Hive"; }
|
||||
|
||||
@ -54,6 +53,9 @@ public:
|
||||
|
||||
bool isColumnOriented() const override;
|
||||
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;
|
||||
|
||||
protected:
|
||||
friend class StorageHiveSource;
|
||||
StorageHive(
|
||||
@ -73,24 +75,56 @@ private:
|
||||
using FileInfo = HiveMetastoreClient::FileInfo;
|
||||
using HiveTableMetadataPtr = HiveMetastoreClient::HiveTableMetadataPtr;
|
||||
|
||||
enum class PruneLevel
|
||||
{
|
||||
None, /// Do not prune
|
||||
Partition,
|
||||
File,
|
||||
Split,
|
||||
Max = Split,
|
||||
};
|
||||
|
||||
static String pruneLevelToString(PruneLevel level)
|
||||
{
|
||||
return String(magic_enum::enum_name(level));
|
||||
}
|
||||
|
||||
static ASTPtr extractKeyExpressionList(const ASTPtr & node);
|
||||
|
||||
static std::vector<FileInfo> listDirectory(const String & path, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs);
|
||||
|
||||
void initMinMaxIndexExpression();
|
||||
|
||||
std::vector<HiveFilePtr> collectHiveFilesFromPartition(
|
||||
const Apache::Hadoop::Hive::Partition & partition,
|
||||
SelectQueryInfo & query_info,
|
||||
HiveFiles collectHiveFiles(
|
||||
unsigned max_threads,
|
||||
const SelectQueryInfo & query_info,
|
||||
HiveTableMetadataPtr hive_table_metadata,
|
||||
const HDFSFSPtr & fs,
|
||||
ContextPtr context_);
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level = PruneLevel::Max) const;
|
||||
|
||||
HiveFilePtr
|
||||
createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_);
|
||||
HiveFiles collectHiveFilesFromPartition(
|
||||
const Apache::Hadoop::Hive::Partition & partition,
|
||||
const SelectQueryInfo & query_info,
|
||||
HiveTableMetadataPtr hive_table_metadata,
|
||||
const HDFSFSPtr & fs,
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level = PruneLevel::Max) const;
|
||||
|
||||
HiveFilePtr createHiveFileIfNeeded(
|
||||
const FileInfo & file_info,
|
||||
const FieldVector & fields,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr context_,
|
||||
PruneLevel prune_level = PruneLevel::Max) const;
|
||||
|
||||
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
|
||||
|
||||
void lazyInitialize();
|
||||
|
||||
std::optional<UInt64>
|
||||
totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const;
|
||||
|
||||
String hive_metastore_url;
|
||||
|
||||
/// Hive database and table
|
||||
@ -122,9 +156,8 @@ private:
|
||||
std::shared_ptr<HiveSettings> storage_settings;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("StorageHive");
|
||||
|
||||
void lazyInitialize();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user