mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Merge pull request #36082 from bigo-sg/cache_hive_files
This commit is contained in:
commit
8873d7b0e7
@ -29,31 +29,6 @@ ThriftHiveMetastoreClientPool::ThriftHiveMetastoreClientPool(ThriftHiveMetastore
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HiveMetastoreClient::shouldUpdateTableMetadata(
|
|
||||||
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
|
|
||||||
{
|
|
||||||
String cache_key = getCacheKey(db_name, table_name);
|
|
||||||
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
|
|
||||||
if (!metadata)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
auto old_partiton_infos = metadata->getPartitionInfos();
|
|
||||||
if (old_partiton_infos.size() != partitions.size())
|
|
||||||
return true;
|
|
||||||
|
|
||||||
for (const auto & partition : partitions)
|
|
||||||
{
|
|
||||||
auto it = old_partiton_infos.find(partition.sd.location);
|
|
||||||
if (it == old_partiton_infos.end())
|
|
||||||
return true;
|
|
||||||
|
|
||||||
const auto & old_partition_info = it->second;
|
|
||||||
if (!old_partition_info.haveSameParameters(partition))
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void HiveMetastoreClient::tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func)
|
void HiveMetastoreClient::tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func)
|
||||||
{
|
{
|
||||||
int i = 0;
|
int i = 0;
|
||||||
@ -91,44 +66,17 @@ HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(
|
|||||||
};
|
};
|
||||||
tryCallHiveClient(client_call);
|
tryCallHiveClient(client_call);
|
||||||
|
|
||||||
bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
|
// bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
|
||||||
String cache_key = getCacheKey(db_name, table_name);
|
String cache_key = getCacheKey(db_name, table_name);
|
||||||
|
|
||||||
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
|
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
|
||||||
|
if (metadata)
|
||||||
if (update_cache)
|
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Reload hive partition metadata info for {}.{}", db_name, table_name);
|
metadata->updateIfNeeded(partitions);
|
||||||
|
}
|
||||||
/// Generate partition infos from partitions and old partition infos(if exists).
|
else
|
||||||
std::map<String, PartitionInfo> new_partition_infos;
|
{
|
||||||
if (metadata)
|
metadata = std::make_shared<HiveTableMetadata>(db_name, table_name, table, partitions);
|
||||||
{
|
|
||||||
auto & old_partiton_infos = metadata->getPartitionInfos();
|
|
||||||
for (const auto & partition : partitions)
|
|
||||||
{
|
|
||||||
auto it = old_partiton_infos.find(partition.sd.location);
|
|
||||||
if (it == old_partiton_infos.end() || !it->second.haveSameParameters(partition) || !it->second.initialized)
|
|
||||||
{
|
|
||||||
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
PartitionInfo new_partition_info(partition);
|
|
||||||
new_partition_info.files = std::move(it->second.files);
|
|
||||||
new_partition_info.initialized = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (const auto & partition : partitions)
|
|
||||||
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
|
|
||||||
}
|
|
||||||
|
|
||||||
metadata = std::make_shared<HiveMetastoreClient::HiveTableMetadata>(
|
|
||||||
db_name, table_name, table, std::move(new_partition_infos), getContext());
|
|
||||||
table_metadata_cache.set(cache_key, metadata);
|
table_metadata_cache.set(cache_key, metadata);
|
||||||
}
|
}
|
||||||
return metadata;
|
return metadata;
|
||||||
@ -157,14 +105,14 @@ void HiveMetastoreClient::clearTableMetadata(const String & db_name, const Strin
|
|||||||
bool HiveMetastoreClient::PartitionInfo::haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const
|
bool HiveMetastoreClient::PartitionInfo::haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const
|
||||||
{
|
{
|
||||||
/// Parameters include keys:numRows,numFiles,rawDataSize,totalSize,transient_lastDdlTime
|
/// Parameters include keys:numRows,numFiles,rawDataSize,totalSize,transient_lastDdlTime
|
||||||
auto it1 = partition.parameters.cbegin();
|
auto it = partition.parameters.cbegin();
|
||||||
auto it2 = other.parameters.cbegin();
|
auto oit = other.parameters.cbegin();
|
||||||
for (; it1 != partition.parameters.cend() && it2 != other.parameters.cend(); ++it1, ++it2)
|
for (; it != partition.parameters.cend() && oit != other.parameters.cend(); ++it, ++oit)
|
||||||
{
|
{
|
||||||
if (it1->first != it2->first || it1->second != it2->second)
|
if (it->first != oit->first || it->second != oit->second)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return (it1 == partition.parameters.cend() && it2 == other.parameters.cend());
|
return (it == partition.parameters.cend() && oit == other.parameters.cend());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMetadata::getPartitions() const
|
std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMetadata::getPartitions() const
|
||||||
@ -172,6 +120,7 @@ std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMetad
|
|||||||
std::vector<Apache::Hadoop::Hive::Partition> result;
|
std::vector<Apache::Hadoop::Hive::Partition> result;
|
||||||
|
|
||||||
std::lock_guard lock{mutex};
|
std::lock_guard lock{mutex};
|
||||||
|
result.reserve(partition_infos.size());
|
||||||
for (const auto & partition_info : partition_infos)
|
for (const auto & partition_info : partition_infos)
|
||||||
result.emplace_back(partition_info.second.partition);
|
result.emplace_back(partition_info.second.partition);
|
||||||
return result;
|
return result;
|
||||||
@ -220,6 +169,57 @@ std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadat
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HiveFilesCachePtr HiveMetastoreClient::HiveTableMetadata::getHiveFilesCache() const
|
||||||
|
{
|
||||||
|
return hive_files_cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
void HiveMetastoreClient::HiveTableMetadata::updateIfNeeded(const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
|
||||||
|
{
|
||||||
|
std::lock_guard lock{mutex};
|
||||||
|
|
||||||
|
if (!shouldUpdate(partitions))
|
||||||
|
return;
|
||||||
|
|
||||||
|
std::map<String, PartitionInfo> new_partition_infos;
|
||||||
|
auto & old_partiton_infos = partition_infos;
|
||||||
|
for (const auto & partition : partitions)
|
||||||
|
{
|
||||||
|
auto it = old_partiton_infos.find(partition.sd.location);
|
||||||
|
if (it == old_partiton_infos.end() || !it->second.haveSameParameters(partition) || !it->second.initialized)
|
||||||
|
{
|
||||||
|
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
new_partition_infos.emplace(partition.sd.location, std::move(it->second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
partition_infos.swap(new_partition_infos);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HiveMetastoreClient::HiveTableMetadata::shouldUpdate(const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
|
||||||
|
{
|
||||||
|
const auto & old_partiton_infos = partition_infos;
|
||||||
|
if (old_partiton_infos.size() != partitions.size())
|
||||||
|
return true;
|
||||||
|
|
||||||
|
for (const auto & partition : partitions)
|
||||||
|
{
|
||||||
|
auto it = old_partiton_infos.find(partition.sd.location);
|
||||||
|
if (it == old_partiton_infos.end())
|
||||||
|
return true;
|
||||||
|
|
||||||
|
const auto & old_partition_info = it->second;
|
||||||
|
if (!old_partition_info.haveSameParameters(partition))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
HiveMetastoreClientFactory & HiveMetastoreClientFactory::instance()
|
HiveMetastoreClientFactory & HiveMetastoreClientFactory::instance()
|
||||||
{
|
{
|
||||||
static HiveMetastoreClientFactory factory;
|
static HiveMetastoreClientFactory factory;
|
||||||
@ -231,9 +231,8 @@ using namespace apache::thrift::protocol;
|
|||||||
using namespace apache::thrift::transport;
|
using namespace apache::thrift::transport;
|
||||||
using namespace Apache::Hadoop::Hive;
|
using namespace Apache::Hadoop::Hive;
|
||||||
|
|
||||||
HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & name, ContextPtr context)
|
HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & name)
|
||||||
{
|
{
|
||||||
|
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto it = clients.find(name);
|
auto it = clients.find(name);
|
||||||
if (it == clients.end())
|
if (it == clients.end())
|
||||||
@ -242,12 +241,13 @@ HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & na
|
|||||||
{
|
{
|
||||||
return createThriftHiveMetastoreClient(name);
|
return createThriftHiveMetastoreClient(name);
|
||||||
};
|
};
|
||||||
auto client = std::make_shared<HiveMetastoreClient>(builder, context->getGlobalContext());
|
auto client = std::make_shared<HiveMetastoreClient>(builder);
|
||||||
clients[name] = client;
|
clients.emplace(name, client);
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<ThriftHiveMetastoreClient> HiveMetastoreClientFactory::createThriftHiveMetastoreClient(const String &name)
|
std::shared_ptr<ThriftHiveMetastoreClient> HiveMetastoreClientFactory::createThriftHiveMetastoreClient(const String &name)
|
||||||
{
|
{
|
||||||
Poco::URI hive_metastore_url(name);
|
Poco::URI hive_metastore_url(name);
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include <Common/LRUCache.h>
|
#include <Common/LRUCache.h>
|
||||||
#include <Common/PoolBase.h>
|
#include <Common/PoolBase.h>
|
||||||
#include <Storages/HDFS/HDFSCommon.h>
|
#include <Storages/HDFS/HDFSCommon.h>
|
||||||
|
#include <Storages/Hive/HiveFile.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -37,10 +38,9 @@ protected:
|
|||||||
private:
|
private:
|
||||||
ThriftHiveMetastoreClientBuilder builder;
|
ThriftHiveMetastoreClientBuilder builder;
|
||||||
};
|
};
|
||||||
class HiveMetastoreClient : public WithContext
|
class HiveMetastoreClient
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
struct FileInfo
|
struct FileInfo
|
||||||
{
|
{
|
||||||
String path;
|
String path;
|
||||||
@ -63,68 +63,68 @@ public:
|
|||||||
bool initialized = false; /// If true, files are initialized.
|
bool initialized = false; /// If true, files are initialized.
|
||||||
|
|
||||||
explicit PartitionInfo(const Apache::Hadoop::Hive::Partition & partition_): partition(partition_) {}
|
explicit PartitionInfo(const Apache::Hadoop::Hive::Partition & partition_): partition(partition_) {}
|
||||||
|
PartitionInfo(PartitionInfo &&) = default;
|
||||||
|
|
||||||
bool haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const;
|
bool haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class HiveTableMetadata;
|
||||||
|
using HiveTableMetadataPtr = std::shared_ptr<HiveTableMetadata>;
|
||||||
|
|
||||||
/// Used for speeding up metadata query process.
|
/// Used for speeding up metadata query process.
|
||||||
struct HiveTableMetadata : public WithContext
|
class HiveTableMetadata : boost::noncopyable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
HiveTableMetadata(
|
HiveTableMetadata(
|
||||||
const String & db_name_,
|
const String & db_name_,
|
||||||
const String & table_name_,
|
const String & table_name_,
|
||||||
std::shared_ptr<Apache::Hadoop::Hive::Table> table_,
|
std::shared_ptr<Apache::Hadoop::Hive::Table> table_,
|
||||||
const std::map<String, PartitionInfo> & partition_infos_,
|
const std::vector<Apache::Hadoop::Hive::Partition> & partitions_)
|
||||||
ContextPtr context_)
|
: db_name(db_name_)
|
||||||
: WithContext(context_)
|
|
||||||
, db_name(db_name_)
|
|
||||||
, table_name(table_name_)
|
, table_name(table_name_)
|
||||||
, table(table_)
|
, table(std::move(table_))
|
||||||
, partition_infos(partition_infos_)
|
|
||||||
, empty_partition_keys(table->partitionKeys.empty())
|
, empty_partition_keys(table->partitionKeys.empty())
|
||||||
|
, hive_files_cache(std::make_shared<HiveFilesCache>(10000))
|
||||||
{
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
for (const auto & partition : partitions_)
|
||||||
|
partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<Apache::Hadoop::Hive::Table> getTable() const { return table; }
|
||||||
std::map<String, PartitionInfo> & getPartitionInfos()
|
|
||||||
{
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
return partition_infos;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<Apache::Hadoop::Hive::Table> getTable() const
|
|
||||||
{
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<Apache::Hadoop::Hive::Partition> getPartitions() const;
|
std::vector<Apache::Hadoop::Hive::Partition> getPartitions() const;
|
||||||
|
|
||||||
std::vector<FileInfo> getFilesByLocation(const HDFSFSPtr & fs, const String & location);
|
std::vector<FileInfo> getFilesByLocation(const HDFSFSPtr & fs, const String & location);
|
||||||
|
|
||||||
private:
|
HiveFilesCachePtr getHiveFilesCache() const;
|
||||||
String db_name;
|
|
||||||
String table_name;
|
|
||||||
|
|
||||||
|
void updateIfNeeded(const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool shouldUpdate(const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
|
||||||
|
|
||||||
|
const String db_name;
|
||||||
|
const String table_name;
|
||||||
|
const std::shared_ptr<Apache::Hadoop::Hive::Table> table;
|
||||||
|
|
||||||
|
/// Mutex to protect partition_infos.
|
||||||
mutable std::mutex mutex;
|
mutable std::mutex mutex;
|
||||||
std::shared_ptr<Apache::Hadoop::Hive::Table> table;
|
|
||||||
std::map<String, PartitionInfo> partition_infos;
|
std::map<String, PartitionInfo> partition_infos;
|
||||||
|
|
||||||
const bool empty_partition_keys;
|
const bool empty_partition_keys;
|
||||||
|
const HiveFilesCachePtr hive_files_cache;
|
||||||
|
|
||||||
Poco::Logger * log = &Poco::Logger::get("HiveMetastoreClient");
|
Poco::Logger * log = &Poco::Logger::get("HiveMetastoreClient");
|
||||||
};
|
};
|
||||||
|
|
||||||
using HiveTableMetadataPtr = std::shared_ptr<HiveMetastoreClient::HiveTableMetadata>;
|
|
||||||
|
|
||||||
explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_, ContextPtr context_)
|
explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_)
|
||||||
: WithContext(context_)
|
: table_metadata_cache(1000)
|
||||||
, table_metadata_cache(1000)
|
|
||||||
, client_pool(builder_)
|
, client_pool(builder_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
HiveTableMetadataPtr getTableMetadata(const String & db_name, const String & table_name);
|
HiveTableMetadataPtr getTableMetadata(const String & db_name, const String & table_name);
|
||||||
// Access hive table information by hive client
|
// Access hive table information by hive client
|
||||||
std::shared_ptr<Apache::Hadoop::Hive::Table> getHiveTable(const String & db_name, const String & table_name);
|
std::shared_ptr<Apache::Hadoop::Hive::Table> getHiveTable(const String & db_name, const String & table_name);
|
||||||
@ -133,9 +133,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; }
|
static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; }
|
||||||
|
|
||||||
bool shouldUpdateTableMetadata(
|
|
||||||
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
|
|
||||||
|
|
||||||
void tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func);
|
void tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func);
|
||||||
|
|
||||||
LRUCache<String, HiveTableMetadata> table_metadata_cache;
|
LRUCache<String, HiveTableMetadata> table_metadata_cache;
|
||||||
@ -150,11 +147,11 @@ class HiveMetastoreClientFactory final : private boost::noncopyable
|
|||||||
public:
|
public:
|
||||||
static HiveMetastoreClientFactory & instance();
|
static HiveMetastoreClientFactory & instance();
|
||||||
|
|
||||||
HiveMetastoreClientPtr getOrCreate(const String & name, ContextPtr context);
|
HiveMetastoreClientPtr getOrCreate(const String & name);
|
||||||
|
|
||||||
static std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> createThriftHiveMetastoreClient(const String & name);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
static std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> createThriftHiveMetastoreClient(const String & name);
|
||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::map<String, HiveMetastoreClientPtr> clients;
|
std::map<String, HiveMetastoreClientPtr> clients;
|
||||||
};
|
};
|
||||||
|
@ -77,6 +77,29 @@ Range createRangeFromParquetStatistics(std::shared_ptr<parquet::ByteArrayStatist
|
|||||||
return Range(min_val, true, max_val, true);
|
return Range(min_val, true, max_val, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<size_t> IHiveFile::getRows()
|
||||||
|
{
|
||||||
|
if (!rows)
|
||||||
|
rows = getRowsImpl();
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
void IHiveFile::loadFileMinMaxIndex()
|
||||||
|
{
|
||||||
|
if (file_minmax_idx_loaded)
|
||||||
|
return;
|
||||||
|
loadFileMinMaxIndexImpl();
|
||||||
|
file_minmax_idx_loaded = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void IHiveFile::loadSplitMinMaxIndexes()
|
||||||
|
{
|
||||||
|
if (split_minmax_idxes_loaded)
|
||||||
|
return;
|
||||||
|
loadSplitMinMaxIndexesImpl();
|
||||||
|
split_minmax_idxes_loaded = true;
|
||||||
|
}
|
||||||
|
|
||||||
Range HiveORCFile::buildRange(const orc::ColumnStatistics * col_stats)
|
Range HiveORCFile::buildRange(const orc::ColumnStatistics * col_stats)
|
||||||
{
|
{
|
||||||
if (!col_stats || col_stats->hasNull())
|
if (!col_stats || col_stats->hasNull())
|
||||||
@ -183,8 +206,7 @@ std::unique_ptr<IMergeTreeDataPart::MinMaxIndex> HiveORCFile::buildMinMaxIndex(c
|
|||||||
return idx;
|
return idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void HiveORCFile::loadFileMinMaxIndexImpl()
|
||||||
void HiveORCFile::loadFileMinMaxIndex()
|
|
||||||
{
|
{
|
||||||
if (!reader)
|
if (!reader)
|
||||||
{
|
{
|
||||||
@ -202,7 +224,7 @@ bool HiveORCFile::useSplitMinMaxIndex() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void HiveORCFile::loadSplitMinMaxIndex()
|
void HiveORCFile::loadSplitMinMaxIndexesImpl()
|
||||||
{
|
{
|
||||||
if (!reader)
|
if (!reader)
|
||||||
{
|
{
|
||||||
@ -226,6 +248,18 @@ void HiveORCFile::loadSplitMinMaxIndex()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<size_t> HiveORCFile::getRowsImpl()
|
||||||
|
{
|
||||||
|
if (!reader)
|
||||||
|
{
|
||||||
|
prepareReader();
|
||||||
|
prepareColumnMapping();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto * raw_reader = reader->GetRawORCReader();
|
||||||
|
return raw_reader->getNumberOfRows();
|
||||||
|
}
|
||||||
|
|
||||||
bool HiveParquetFile::useSplitMinMaxIndex() const
|
bool HiveParquetFile::useSplitMinMaxIndex() const
|
||||||
{
|
{
|
||||||
return storage_settings->enable_parquet_rowgroup_minmax_index;
|
return storage_settings->enable_parquet_rowgroup_minmax_index;
|
||||||
@ -239,7 +273,7 @@ void HiveParquetFile::prepareReader()
|
|||||||
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader));
|
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader));
|
||||||
}
|
}
|
||||||
|
|
||||||
void HiveParquetFile::loadSplitMinMaxIndex()
|
void HiveParquetFile::loadSplitMinMaxIndexesImpl()
|
||||||
{
|
{
|
||||||
if (!reader)
|
if (!reader)
|
||||||
prepareReader();
|
prepareReader();
|
||||||
@ -312,5 +346,14 @@ void HiveParquetFile::loadSplitMinMaxIndex()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<size_t> HiveParquetFile::getRowsImpl()
|
||||||
|
{
|
||||||
|
if (!reader)
|
||||||
|
prepareReader();
|
||||||
|
|
||||||
|
auto meta = reader->parquet_reader()->metadata();
|
||||||
|
return meta->num_rows();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -83,7 +83,7 @@ public:
|
|||||||
size_t size_,
|
size_t size_,
|
||||||
const NamesAndTypesList & index_names_and_types_,
|
const NamesAndTypesList & index_names_and_types_,
|
||||||
const std::shared_ptr<HiveSettings> & storage_settings_,
|
const std::shared_ptr<HiveSettings> & storage_settings_,
|
||||||
ContextPtr context_)
|
const ContextPtr & context_)
|
||||||
: WithContext(context_)
|
: WithContext(context_)
|
||||||
, partition_values(partition_values_)
|
, partition_values(partition_values_)
|
||||||
, namenode_url(namenode_url_)
|
, namenode_url(namenode_url_)
|
||||||
@ -100,6 +100,7 @@ public:
|
|||||||
const String & getPath() const { return path; }
|
const String & getPath() const { return path; }
|
||||||
UInt64 getLastModTs() const { return last_modify_time; }
|
UInt64 getLastModTs() const { return last_modify_time; }
|
||||||
size_t getSize() const { return size; }
|
size_t getSize() const { return size; }
|
||||||
|
std::optional<size_t> getRows();
|
||||||
const FieldVector & getPartitionValues() const { return partition_values; }
|
const FieldVector & getPartitionValues() const { return partition_values; }
|
||||||
const String & getNamenodeUrl() { return namenode_url; }
|
const String & getNamenodeUrl() { return namenode_url; }
|
||||||
MinMaxIndexPtr getMinMaxIndex() const { return file_minmax_idx; }
|
MinMaxIndexPtr getMinMaxIndex() const { return file_minmax_idx; }
|
||||||
@ -112,7 +113,6 @@ public:
|
|||||||
{
|
{
|
||||||
if (!idx)
|
if (!idx)
|
||||||
return "";
|
return "";
|
||||||
|
|
||||||
std::vector<String> strs;
|
std::vector<String> strs;
|
||||||
strs.reserve(index_names_and_types.size());
|
strs.reserve(index_names_and_types.size());
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
@ -123,30 +123,42 @@ public:
|
|||||||
|
|
||||||
virtual FileFormat getFormat() const = 0;
|
virtual FileFormat getFormat() const = 0;
|
||||||
|
|
||||||
|
/// If hive query could use file level minmax index?
|
||||||
virtual bool useFileMinMaxIndex() const { return false; }
|
virtual bool useFileMinMaxIndex() const { return false; }
|
||||||
|
void loadFileMinMaxIndex();
|
||||||
|
|
||||||
virtual void loadFileMinMaxIndex()
|
/// If hive query could use sub-file level minmax index?
|
||||||
{
|
|
||||||
throw Exception("Method loadFileMinMaxIndex is not supported by hive file:" + getFormatName(), ErrorCodes::NOT_IMPLEMENTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If hive query could use contains sub-file level minmax index?
|
|
||||||
virtual bool useSplitMinMaxIndex() const { return false; }
|
virtual bool useSplitMinMaxIndex() const { return false; }
|
||||||
|
void loadSplitMinMaxIndexes();
|
||||||
virtual void loadSplitMinMaxIndex()
|
|
||||||
{
|
|
||||||
throw Exception("Method loadSplitMinMaxIndex is not supported by hive file:" + getFormatName(), ErrorCodes::NOT_IMPLEMENTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
virtual void loadFileMinMaxIndexImpl()
|
||||||
|
{
|
||||||
|
throw Exception("Method loadFileMinMaxIndexImpl is not supported by hive file:" + getFormatName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void loadSplitMinMaxIndexesImpl()
|
||||||
|
{
|
||||||
|
throw Exception("Method loadSplitMinMaxIndexesImpl is not supported by hive file:" + getFormatName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual std::optional<size_t> getRowsImpl() = 0;
|
||||||
|
|
||||||
FieldVector partition_values;
|
FieldVector partition_values;
|
||||||
String namenode_url;
|
String namenode_url;
|
||||||
String path;
|
String path;
|
||||||
UInt64 last_modify_time;
|
UInt64 last_modify_time;
|
||||||
size_t size;
|
size_t size;
|
||||||
|
std::optional<size_t> rows;
|
||||||
|
|
||||||
NamesAndTypesList index_names_and_types;
|
NamesAndTypesList index_names_and_types;
|
||||||
|
|
||||||
MinMaxIndexPtr file_minmax_idx;
|
MinMaxIndexPtr file_minmax_idx;
|
||||||
|
std::atomic<bool> file_minmax_idx_loaded{false};
|
||||||
|
|
||||||
std::vector<MinMaxIndexPtr> split_minmax_idxes;
|
std::vector<MinMaxIndexPtr> split_minmax_idxes;
|
||||||
|
std::atomic<bool> split_minmax_idxes_loaded{false};
|
||||||
|
|
||||||
/// Skip splits for this file after applying minmax index (if any)
|
/// Skip splits for this file after applying minmax index (if any)
|
||||||
std::unordered_set<int> skip_splits;
|
std::unordered_set<int> skip_splits;
|
||||||
std::shared_ptr<HiveSettings> storage_settings;
|
std::shared_ptr<HiveSettings> storage_settings;
|
||||||
@ -154,6 +166,8 @@ protected:
|
|||||||
|
|
||||||
using HiveFilePtr = std::shared_ptr<IHiveFile>;
|
using HiveFilePtr = std::shared_ptr<IHiveFile>;
|
||||||
using HiveFiles = std::vector<HiveFilePtr>;
|
using HiveFiles = std::vector<HiveFilePtr>;
|
||||||
|
using HiveFilesCache = LRUCache<String, IHiveFile>;
|
||||||
|
using HiveFilesCachePtr = std::shared_ptr<HiveFilesCache>;
|
||||||
|
|
||||||
class HiveTextFile : public IHiveFile
|
class HiveTextFile : public IHiveFile
|
||||||
{
|
{
|
||||||
@ -166,12 +180,15 @@ public:
|
|||||||
size_t size_,
|
size_t size_,
|
||||||
const NamesAndTypesList & index_names_and_types_,
|
const NamesAndTypesList & index_names_and_types_,
|
||||||
const std::shared_ptr<HiveSettings> & hive_settings_,
|
const std::shared_ptr<HiveSettings> & hive_settings_,
|
||||||
ContextPtr context_)
|
const ContextPtr & context_)
|
||||||
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual FileFormat getFormat() const override { return FileFormat::TEXT; }
|
FileFormat getFormat() const override { return FileFormat::TEXT; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::optional<size_t> getRowsImpl() override { return {}; }
|
||||||
};
|
};
|
||||||
|
|
||||||
class HiveORCFile : public IHiveFile
|
class HiveORCFile : public IHiveFile
|
||||||
@ -185,25 +202,26 @@ public:
|
|||||||
size_t size_,
|
size_t size_,
|
||||||
const NamesAndTypesList & index_names_and_types_,
|
const NamesAndTypesList & index_names_and_types_,
|
||||||
const std::shared_ptr<HiveSettings> & hive_settings_,
|
const std::shared_ptr<HiveSettings> & hive_settings_,
|
||||||
ContextPtr context_)
|
const ContextPtr & context_)
|
||||||
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
FileFormat getFormat() const override { return FileFormat::ORC; }
|
FileFormat getFormat() const override { return FileFormat::ORC; }
|
||||||
bool useFileMinMaxIndex() const override;
|
bool useFileMinMaxIndex() const override;
|
||||||
void loadFileMinMaxIndex() override;
|
|
||||||
|
|
||||||
bool useSplitMinMaxIndex() const override;
|
bool useSplitMinMaxIndex() const override;
|
||||||
void loadSplitMinMaxIndex() override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static Range buildRange(const orc::ColumnStatistics * col_stats);
|
static Range buildRange(const orc::ColumnStatistics * col_stats);
|
||||||
|
|
||||||
|
void loadFileMinMaxIndexImpl() override;
|
||||||
|
void loadSplitMinMaxIndexesImpl() override;
|
||||||
std::unique_ptr<MinMaxIndex> buildMinMaxIndex(const orc::Statistics * statistics);
|
std::unique_ptr<MinMaxIndex> buildMinMaxIndex(const orc::Statistics * statistics);
|
||||||
void prepareReader();
|
void prepareReader();
|
||||||
void prepareColumnMapping();
|
void prepareColumnMapping();
|
||||||
|
|
||||||
|
std::optional<size_t> getRowsImpl() override;
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromHDFS> in;
|
std::unique_ptr<ReadBufferFromHDFS> in;
|
||||||
std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader;
|
std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader;
|
||||||
std::map<String, size_t> orc_column_positions;
|
std::map<String, size_t> orc_column_positions;
|
||||||
@ -220,17 +238,17 @@ public:
|
|||||||
size_t size_,
|
size_t size_,
|
||||||
const NamesAndTypesList & index_names_and_types_,
|
const NamesAndTypesList & index_names_and_types_,
|
||||||
const std::shared_ptr<HiveSettings> & hive_settings_,
|
const std::shared_ptr<HiveSettings> & hive_settings_,
|
||||||
ContextPtr context_)
|
const ContextPtr & context_)
|
||||||
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
: IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
FileFormat getFormat() const override { return FileFormat::PARQUET; }
|
FileFormat getFormat() const override { return FileFormat::PARQUET; }
|
||||||
|
|
||||||
bool useSplitMinMaxIndex() const override;
|
bool useSplitMinMaxIndex() const override;
|
||||||
void loadSplitMinMaxIndex() override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void loadSplitMinMaxIndexesImpl() override;
|
||||||
|
std::optional<size_t> getRowsImpl() override;
|
||||||
void prepareReader();
|
void prepareReader();
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromHDFS> in;
|
std::unique_ptr<ReadBufferFromHDFS> in;
|
||||||
|
@ -44,6 +44,7 @@ namespace ErrorCodes
|
|||||||
extern const int INVALID_PARTITION_VALUE;
|
extern const int INVALID_PARTITION_VALUE;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_OPEN_FILE;
|
extern const int CANNOT_OPEN_FILE;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -60,7 +61,7 @@ public:
|
|||||||
struct SourcesInfo
|
struct SourcesInfo
|
||||||
{
|
{
|
||||||
HiveMetastoreClientPtr hive_metastore_client;
|
HiveMetastoreClientPtr hive_metastore_client;
|
||||||
std::string database;
|
std::string database_name;
|
||||||
std::string table_name;
|
std::string table_name;
|
||||||
HiveFiles hive_files;
|
HiveFiles hive_files;
|
||||||
NamesAndTypesList partition_name_types;
|
NamesAndTypesList partition_name_types;
|
||||||
@ -169,7 +170,7 @@ public:
|
|||||||
{
|
{
|
||||||
if (e.code() == ErrorCodes::CANNOT_OPEN_FILE)
|
if (e.code() == ErrorCodes::CANNOT_OPEN_FILE)
|
||||||
{
|
{
|
||||||
source_info->hive_metastore_client->clearTableMetadata(source_info->database, source_info->table_name);
|
source_info->hive_metastore_client->clearTableMetadata(source_info->database_name, source_info->table_name);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -307,6 +308,8 @@ StorageHive::StorageHive(
|
|||||||
storage_metadata.setColumns(columns_);
|
storage_metadata.setColumns(columns_);
|
||||||
storage_metadata.setConstraints(constraints_);
|
storage_metadata.setConstraints(constraints_);
|
||||||
storage_metadata.setComment(comment_);
|
storage_metadata.setComment(comment_);
|
||||||
|
storage_metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_ast, storage_metadata.columns, getContext());
|
||||||
|
|
||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -316,7 +319,7 @@ void StorageHive::lazyInitialize()
|
|||||||
if (has_initialized)
|
if (has_initialized)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
|
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url);
|
||||||
auto hive_table_metadata = hive_metastore_client->getHiveTable(hive_database, hive_table);
|
auto hive_table_metadata = hive_metastore_client->getHiveTable(hive_database, hive_table);
|
||||||
|
|
||||||
hdfs_namenode_url = getNameNodeUrl(hive_table_metadata->sd.location);
|
hdfs_namenode_url = getNameNodeUrl(hive_table_metadata->sd.location);
|
||||||
@ -412,7 +415,7 @@ ASTPtr StorageHive::extractKeyExpressionList(const ASTPtr & node)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
HiveFilePtr createHiveFile(
|
static HiveFilePtr createHiveFile(
|
||||||
const String & format_name,
|
const String & format_name,
|
||||||
const FieldVector & fields,
|
const FieldVector & fields,
|
||||||
const String & namenode_url,
|
const String & namenode_url,
|
||||||
@ -421,7 +424,7 @@ HiveFilePtr createHiveFile(
|
|||||||
size_t size,
|
size_t size,
|
||||||
const NamesAndTypesList & index_names_and_types,
|
const NamesAndTypesList & index_names_and_types,
|
||||||
const std::shared_ptr<HiveSettings> & hive_settings,
|
const std::shared_ptr<HiveSettings> & hive_settings,
|
||||||
ContextPtr context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
HiveFilePtr hive_file;
|
HiveFilePtr hive_file;
|
||||||
if (format_name == "HiveText")
|
if (format_name == "HiveText")
|
||||||
@ -443,24 +446,26 @@ HiveFilePtr createHiveFile(
|
|||||||
return hive_file;
|
return hive_file;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<HiveFilePtr> StorageHive::collectHiveFilesFromPartition(
|
HiveFiles StorageHive::collectHiveFilesFromPartition(
|
||||||
const Apache::Hadoop::Hive::Partition & partition,
|
const Apache::Hadoop::Hive::Partition & partition,
|
||||||
SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
HiveTableMetadataPtr hive_table_metadata,
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
const HDFSFSPtr & fs,
|
const HDFSFSPtr & fs,
|
||||||
ContextPtr context_)
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level) const
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Collect hive files from partition {}", boost::join(partition.values, ","));
|
LOG_DEBUG(
|
||||||
|
log, "Collect hive files from partition {}, prune_level:{}", boost::join(partition.values, ","), pruneLevelToString(prune_level));
|
||||||
|
|
||||||
/// Skip partition "__HIVE_DEFAULT_PARTITION__"
|
/// Skip partition "__HIVE_DEFAULT_PARTITION__"
|
||||||
bool has_default_partition = false;
|
bool has_default_partition = false;
|
||||||
for (const auto & value : partition.values)
|
for (const auto & value : partition.values)
|
||||||
{
|
{
|
||||||
if (value == "__HIVE_DEFAULT_PARTITION__")
|
if (value == "__HIVE_DEFAULT_PARTITION__")
|
||||||
{
|
{
|
||||||
has_default_partition = true;
|
has_default_partition = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (has_default_partition)
|
if (has_default_partition)
|
||||||
return {};
|
return {};
|
||||||
@ -490,95 +495,133 @@ std::vector<HiveFilePtr> StorageHive::collectHiveFilesFromPartition(
|
|||||||
if (!reader->pull(block) || !block.rows())
|
if (!reader->pull(block) || !block.rows())
|
||||||
throw Exception("Could not parse partition value: " + wb.str(), ErrorCodes::INVALID_PARTITION_VALUE);
|
throw Exception("Could not parse partition value: " + wb.str(), ErrorCodes::INVALID_PARTITION_VALUE);
|
||||||
|
|
||||||
std::vector<Range> ranges;
|
/// Get partition values
|
||||||
ranges.reserve(partition_names.size());
|
|
||||||
FieldVector fields(partition_names.size());
|
FieldVector fields(partition_names.size());
|
||||||
for (size_t i = 0; i < partition_names.size(); ++i)
|
for (size_t i = 0; i < partition_names.size(); ++i)
|
||||||
{
|
|
||||||
block.getByPosition(i).column->get(0, fields[i]);
|
block.getByPosition(i).column->get(0, fields[i]);
|
||||||
ranges.emplace_back(fields[i]);
|
|
||||||
|
if (prune_level >= PruneLevel::Partition)
|
||||||
|
{
|
||||||
|
std::vector<Range> ranges;
|
||||||
|
ranges.reserve(partition_names.size());
|
||||||
|
for (size_t i = 0; i < partition_names.size(); ++i)
|
||||||
|
ranges.emplace_back(fields[i]);
|
||||||
|
|
||||||
|
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
|
||||||
|
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
|
||||||
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
|
HiveFiles hive_files;
|
||||||
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
|
|
||||||
return {};
|
|
||||||
|
|
||||||
auto file_infos = listDirectory(partition.sd.location, hive_table_metadata, fs);
|
auto file_infos = listDirectory(partition.sd.location, hive_table_metadata, fs);
|
||||||
std::vector<HiveFilePtr> hive_files;
|
|
||||||
hive_files.reserve(file_infos.size());
|
hive_files.reserve(file_infos.size());
|
||||||
for (const auto & file_info : file_infos)
|
for (const auto & file_info : file_infos)
|
||||||
{
|
{
|
||||||
auto hive_file = createHiveFileIfNeeded(file_info, fields, query_info, context_);
|
auto hive_file = getHiveFileIfNeeded(file_info, fields, query_info, hive_table_metadata, context_, prune_level);
|
||||||
if (hive_file)
|
if (hive_file)
|
||||||
|
{
|
||||||
|
LOG_TRACE(
|
||||||
|
log,
|
||||||
|
"Append hive file {} from partition {}, prune_level:{}",
|
||||||
|
hive_file->getPath(),
|
||||||
|
boost::join(partition.values, ","),
|
||||||
|
pruneLevelToString(prune_level));
|
||||||
hive_files.push_back(hive_file);
|
hive_files.push_back(hive_file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return hive_files;
|
return hive_files;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<StorageHive::FileInfo>
|
std::vector<StorageHive::FileInfo>
|
||||||
StorageHive::listDirectory(const String & path, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs)
|
StorageHive::listDirectory(const String & path, const HiveTableMetadataPtr & hive_table_metadata, const HDFSFSPtr & fs)
|
||||||
{
|
{
|
||||||
return hive_table_metadata->getFilesByLocation(fs, path);
|
return hive_table_metadata->getFilesByLocation(fs, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
HiveFilePtr StorageHive::createHiveFileIfNeeded(
|
HiveFilePtr StorageHive::getHiveFileIfNeeded(
|
||||||
const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_)
|
const FileInfo & file_info,
|
||||||
|
const FieldVector & fields,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level) const
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Append hive file {}", file_info.path);
|
|
||||||
String filename = getBaseName(file_info.path);
|
String filename = getBaseName(file_info.path);
|
||||||
/// Skip temporary files starts with '.'
|
/// Skip temporary files starts with '.'
|
||||||
if (filename.find('.') == 0)
|
if (startsWith(filename, "."))
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
auto hive_file = createHiveFile(
|
auto cache = hive_table_metadata->getHiveFilesCache();
|
||||||
format_name,
|
auto hive_file = cache->get(file_info.path);
|
||||||
fields,
|
if (!hive_file || hive_file->getLastModTs() < file_info.last_modify_time)
|
||||||
hdfs_namenode_url,
|
|
||||||
file_info.path,
|
|
||||||
file_info.last_modify_time,
|
|
||||||
file_info.size,
|
|
||||||
hivefile_name_types,
|
|
||||||
storage_settings,
|
|
||||||
context_);
|
|
||||||
|
|
||||||
/// Load file level minmax index and apply
|
|
||||||
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
|
|
||||||
if (hive_file->useFileMinMaxIndex())
|
|
||||||
{
|
{
|
||||||
hive_file->loadFileMinMaxIndex();
|
LOG_TRACE(log, "Create hive file {}, prune_level {}", file_info.path, pruneLevelToString(prune_level));
|
||||||
if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes())
|
hive_file = createHiveFile(
|
||||||
.can_be_true)
|
format_name,
|
||||||
{
|
fields,
|
||||||
LOG_TRACE(
|
hdfs_namenode_url,
|
||||||
log, "Skip hive file {} by index {}", hive_file->getPath(), hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex()));
|
file_info.path,
|
||||||
return {};
|
file_info.last_modify_time,
|
||||||
}
|
file_info.size,
|
||||||
|
hivefile_name_types,
|
||||||
|
storage_settings,
|
||||||
|
context_->getGlobalContext());
|
||||||
|
cache->set(file_info.path, hive_file);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Get hive file {} from cache, prune_level {}", file_info.path, pruneLevelToString(prune_level));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load sub-file level minmax index and apply
|
if (prune_level >= PruneLevel::File)
|
||||||
if (hive_file->useSplitMinMaxIndex())
|
|
||||||
{
|
{
|
||||||
std::unordered_set<int> skip_splits;
|
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
|
||||||
hive_file->loadSplitMinMaxIndex();
|
if (hive_file->useFileMinMaxIndex())
|
||||||
const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes();
|
|
||||||
for (size_t i = 0; i < sub_minmax_idxes.size(); ++i)
|
|
||||||
{
|
{
|
||||||
if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes())
|
/// Load file level minmax index and apply
|
||||||
|
hive_file->loadFileMinMaxIndex();
|
||||||
|
if (!hivefile_key_condition.checkInHyperrectangle(hive_file->getMinMaxIndex()->hyperrectangle, hivefile_name_types.getTypes())
|
||||||
.can_be_true)
|
.can_be_true)
|
||||||
{
|
{
|
||||||
LOG_TRACE(
|
LOG_TRACE(
|
||||||
log,
|
log,
|
||||||
"Skip split {} of hive file {} by index {}",
|
"Skip hive file {} by index {}",
|
||||||
i,
|
|
||||||
hive_file->getPath(),
|
hive_file->getPath(),
|
||||||
hive_file->describeMinMaxIndex(sub_minmax_idxes[i]));
|
hive_file->describeMinMaxIndex(hive_file->getMinMaxIndex()));
|
||||||
skip_splits.insert(i);
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (prune_level >= PruneLevel::Split)
|
||||||
|
{
|
||||||
|
if (hive_file->useSplitMinMaxIndex())
|
||||||
|
{
|
||||||
|
/// Load sub-file level minmax index and apply
|
||||||
|
std::unordered_set<int> skip_splits;
|
||||||
|
hive_file->loadSplitMinMaxIndexes();
|
||||||
|
const auto & sub_minmax_idxes = hive_file->getSubMinMaxIndexes();
|
||||||
|
for (size_t i = 0; i < sub_minmax_idxes.size(); ++i)
|
||||||
|
{
|
||||||
|
if (!hivefile_key_condition.checkInHyperrectangle(sub_minmax_idxes[i]->hyperrectangle, hivefile_name_types.getTypes())
|
||||||
|
.can_be_true)
|
||||||
|
{
|
||||||
|
LOG_TRACE(
|
||||||
|
log,
|
||||||
|
"Skip split {} of hive file {} by index {}",
|
||||||
|
i,
|
||||||
|
hive_file->getPath(),
|
||||||
|
hive_file->describeMinMaxIndex(sub_minmax_idxes[i]));
|
||||||
|
|
||||||
|
skip_splits.insert(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hive_file->setSkipSplits(skip_splits);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hive_file->setSkipSplits(skip_splits);
|
|
||||||
}
|
}
|
||||||
return hive_file;
|
return hive_file;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageHive::isColumnOriented() const
|
bool StorageHive::isColumnOriented() const
|
||||||
{
|
{
|
||||||
return format_name == "Parquet" || format_name == "ORC";
|
return format_name == "Parquet" || format_name == "ORC";
|
||||||
@ -607,6 +650,7 @@ void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & hea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Pipe StorageHive::read(
|
Pipe StorageHive::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageSnapshotPtr & storage_snapshot,
|
const StorageSnapshotPtr & storage_snapshot,
|
||||||
@ -620,55 +664,17 @@ Pipe StorageHive::read(
|
|||||||
|
|
||||||
HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, context_->getGlobalContext()->getConfigRef());
|
HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, context_->getGlobalContext()->getConfigRef());
|
||||||
HDFSFSPtr fs = createHDFSFS(builder.get());
|
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||||
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
|
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url);
|
||||||
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
|
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
|
||||||
|
|
||||||
std::vector<Apache::Hadoop::Hive::Partition> partitions = hive_table_metadata->getPartitions();
|
/// Collect Hive files to read
|
||||||
/// Hive files to read
|
HiveFiles hive_files = collectHiveFiles(num_streams, query_info, hive_table_metadata, fs, context_);
|
||||||
HiveFiles hive_files;
|
if (hive_files.empty())
|
||||||
/// Mutext to protect hive_files, which maybe appended in multiple threads
|
|
||||||
std::mutex hive_files_mutex;
|
|
||||||
|
|
||||||
ThreadPool pool{num_streams};
|
|
||||||
if (!partitions.empty())
|
|
||||||
{
|
|
||||||
for (const auto & partition : partitions)
|
|
||||||
{
|
|
||||||
pool.scheduleOrThrowOnError([&]()
|
|
||||||
{
|
|
||||||
auto hive_files_in_partition = collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_);
|
|
||||||
if (!hive_files_in_partition.empty())
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
|
||||||
hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
pool.wait();
|
|
||||||
}
|
|
||||||
else if (partition_name_types.empty()) /// Partition keys is empty
|
|
||||||
{
|
|
||||||
auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs);
|
|
||||||
for (const auto & file_info : file_infos)
|
|
||||||
{
|
|
||||||
pool.scheduleOrThrowOnError([&]
|
|
||||||
{
|
|
||||||
auto hive_file = createHiveFileIfNeeded(file_info, {}, query_info, context_);
|
|
||||||
if (hive_file)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
|
||||||
hive_files.push_back(hive_file);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
pool.wait();
|
|
||||||
}
|
|
||||||
else /// Partition keys is not empty but partitions is empty
|
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
auto sources_info = std::make_shared<StorageHiveSource::SourcesInfo>();
|
auto sources_info = std::make_shared<StorageHiveSource::SourcesInfo>();
|
||||||
sources_info->hive_files = std::move(hive_files);
|
sources_info->hive_files = std::move(hive_files);
|
||||||
sources_info->database = hive_database;
|
sources_info->database_name = hive_database;
|
||||||
sources_info->table_name = hive_table;
|
sources_info->table_name = hive_table;
|
||||||
sources_info->hive_metastore_client = hive_metastore_client;
|
sources_info->hive_metastore_client = hive_metastore_client;
|
||||||
sources_info->partition_name_types = partition_name_types;
|
sources_info->partition_name_types = partition_name_types;
|
||||||
@ -705,6 +711,62 @@ Pipe StorageHive::read(
|
|||||||
return Pipe::unitePipes(std::move(pipes));
|
return Pipe::unitePipes(std::move(pipes));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HiveFiles StorageHive::collectHiveFiles(
|
||||||
|
unsigned max_threads,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
|
const HDFSFSPtr & fs,
|
||||||
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level) const
|
||||||
|
{
|
||||||
|
std::vector<Apache::Hadoop::Hive::Partition> partitions = hive_table_metadata->getPartitions();
|
||||||
|
/// Hive table have no partition
|
||||||
|
if (!partition_name_types.empty() && partitions.empty())
|
||||||
|
return {};
|
||||||
|
|
||||||
|
/// Hive files to collect
|
||||||
|
HiveFiles hive_files;
|
||||||
|
/// Mutext to protect hive_files, which maybe appended in multiple threads
|
||||||
|
std::mutex hive_files_mutex;
|
||||||
|
ThreadPool pool{max_threads};
|
||||||
|
if (!partitions.empty())
|
||||||
|
{
|
||||||
|
for (const auto & partition : partitions)
|
||||||
|
{
|
||||||
|
pool.scheduleOrThrowOnError(
|
||||||
|
[&]()
|
||||||
|
{
|
||||||
|
auto hive_files_in_partition
|
||||||
|
= collectHiveFilesFromPartition(partition, query_info, hive_table_metadata, fs, context_, prune_level);
|
||||||
|
if (!hive_files_in_partition.empty())
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||||
|
hive_files.insert(std::end(hive_files), std::begin(hive_files_in_partition), std::end(hive_files_in_partition));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else /// Partition keys is empty but still have files
|
||||||
|
{
|
||||||
|
auto file_infos = listDirectory(hive_table_metadata->getTable()->sd.location, hive_table_metadata, fs);
|
||||||
|
for (const auto & file_info : file_infos)
|
||||||
|
{
|
||||||
|
pool.scheduleOrThrowOnError(
|
||||||
|
[&]()
|
||||||
|
{
|
||||||
|
auto hive_file = getHiveFileIfNeeded(file_info, {}, query_info, hive_table_metadata, context_, prune_level);
|
||||||
|
if (hive_file)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(hive_files_mutex);
|
||||||
|
hive_files.push_back(hive_file);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.wait();
|
||||||
|
return hive_files;
|
||||||
|
}
|
||||||
|
|
||||||
SinkToStoragePtr StorageHive::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /* metadata_snapshot*/, ContextPtr /*context*/)
|
SinkToStoragePtr StorageHive::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /* metadata_snapshot*/, ContextPtr /*context*/)
|
||||||
{
|
{
|
||||||
throw Exception("Method write is not implemented for StorageHive", ErrorCodes::NOT_IMPLEMENTED);
|
throw Exception("Method write is not implemented for StorageHive", ErrorCodes::NOT_IMPLEMENTED);
|
||||||
@ -717,6 +779,44 @@ NamesAndTypesList StorageHive::getVirtuals() const
|
|||||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<UInt64> StorageHive::totalRows(const Settings & settings) const
|
||||||
|
{
|
||||||
|
/// query_info is not used when prune_level == PruneLevel::None
|
||||||
|
SelectQueryInfo query_info;
|
||||||
|
return totalRowsImpl(settings, query_info, getContext(), PruneLevel::None);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<UInt64> StorageHive::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const
|
||||||
|
{
|
||||||
|
return totalRowsImpl(context_->getSettingsRef(), query_info, context_, PruneLevel::Partition);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<UInt64>
|
||||||
|
StorageHive::totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const
|
||||||
|
{
|
||||||
|
/// Row-based format like Text doesn't support totalRowsByPartitionPredicate
|
||||||
|
if (!isColumnOriented())
|
||||||
|
return {};
|
||||||
|
|
||||||
|
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url);
|
||||||
|
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
|
||||||
|
HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, getContext()->getGlobalContext()->getConfigRef());
|
||||||
|
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||||
|
HiveFiles hive_files = collectHiveFiles(settings.max_threads, query_info, hive_table_metadata, fs, context_, prune_level);
|
||||||
|
|
||||||
|
UInt64 total_rows = 0;
|
||||||
|
for (const auto & hive_file : hive_files)
|
||||||
|
{
|
||||||
|
auto file_rows = hive_file->getRows();
|
||||||
|
if (!file_rows)
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LOGICAL_ERROR, "Rows of hive file:{} with format:{} not initialized", hive_file->getPath(), format_name);
|
||||||
|
total_rows += *file_rows;
|
||||||
|
}
|
||||||
|
return total_rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void registerStorageHive(StorageFactory & factory)
|
void registerStorageHive(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerStorage(
|
factory.registerStorage(
|
||||||
|
@ -26,7 +26,6 @@ class HiveSettings;
|
|||||||
class StorageHive final : public shared_ptr_helper<StorageHive>, public IStorage, WithContext
|
class StorageHive final : public shared_ptr_helper<StorageHive>, public IStorage, WithContext
|
||||||
{
|
{
|
||||||
friend struct shared_ptr_helper<StorageHive>;
|
friend struct shared_ptr_helper<StorageHive>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
String getName() const override { return "Hive"; }
|
String getName() const override { return "Hive"; }
|
||||||
|
|
||||||
@ -39,7 +38,6 @@ public:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Pipe read(
|
Pipe read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageSnapshotPtr & storage_snapshot,
|
const StorageSnapshotPtr & storage_snapshot,
|
||||||
@ -55,6 +53,9 @@ public:
|
|||||||
|
|
||||||
bool isColumnOriented() const override;
|
bool isColumnOriented() const override;
|
||||||
|
|
||||||
|
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||||
|
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class StorageHiveSource;
|
friend class StorageHiveSource;
|
||||||
StorageHive(
|
StorageHive(
|
||||||
@ -74,31 +75,64 @@ private:
|
|||||||
using FileInfo = HiveMetastoreClient::FileInfo;
|
using FileInfo = HiveMetastoreClient::FileInfo;
|
||||||
using HiveTableMetadataPtr = HiveMetastoreClient::HiveTableMetadataPtr;
|
using HiveTableMetadataPtr = HiveMetastoreClient::HiveTableMetadataPtr;
|
||||||
|
|
||||||
|
enum class PruneLevel
|
||||||
|
{
|
||||||
|
None, /// Do not prune
|
||||||
|
Partition,
|
||||||
|
File,
|
||||||
|
Split,
|
||||||
|
Max = Split,
|
||||||
|
};
|
||||||
|
|
||||||
|
static String pruneLevelToString(PruneLevel level)
|
||||||
|
{
|
||||||
|
return String(magic_enum::enum_name(level));
|
||||||
|
}
|
||||||
|
|
||||||
static ASTPtr extractKeyExpressionList(const ASTPtr & node);
|
static ASTPtr extractKeyExpressionList(const ASTPtr & node);
|
||||||
|
|
||||||
static std::vector<FileInfo> listDirectory(const String & path, HiveTableMetadataPtr hive_table_metadata, const HDFSFSPtr & fs);
|
static std::vector<FileInfo> listDirectory(const String & path, const HiveTableMetadataPtr & hive_table_metadata, const HDFSFSPtr & fs);
|
||||||
|
|
||||||
void initMinMaxIndexExpression();
|
void initMinMaxIndexExpression();
|
||||||
|
|
||||||
std::vector<HiveFilePtr> collectHiveFilesFromPartition(
|
HiveFiles collectHiveFiles(
|
||||||
const Apache::Hadoop::Hive::Partition & partition,
|
unsigned max_threads,
|
||||||
SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
HiveTableMetadataPtr hive_table_metadata,
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
const HDFSFSPtr & fs,
|
const HDFSFSPtr & fs,
|
||||||
ContextPtr context_);
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level = PruneLevel::Max) const;
|
||||||
|
|
||||||
HiveFilePtr
|
HiveFiles collectHiveFilesFromPartition(
|
||||||
createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_);
|
const Apache::Hadoop::Hive::Partition & partition,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
|
const HDFSFSPtr & fs,
|
||||||
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level = PruneLevel::Max) const;
|
||||||
|
|
||||||
|
HiveFilePtr getHiveFileIfNeeded(
|
||||||
|
const FileInfo & file_info,
|
||||||
|
const FieldVector & fields,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const HiveTableMetadataPtr & hive_table_metadata,
|
||||||
|
const ContextPtr & context_,
|
||||||
|
PruneLevel prune_level = PruneLevel::Max) const;
|
||||||
|
|
||||||
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
|
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
|
||||||
|
|
||||||
|
void lazyInitialize();
|
||||||
|
|
||||||
|
std::optional<UInt64>
|
||||||
|
totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const;
|
||||||
|
|
||||||
String hive_metastore_url;
|
String hive_metastore_url;
|
||||||
|
|
||||||
/// Hive database and table
|
/// Hive database and table
|
||||||
String hive_database;
|
String hive_database;
|
||||||
String hive_table;
|
String hive_table;
|
||||||
|
|
||||||
std::mutex init_mutex;
|
mutable std::mutex init_mutex;
|
||||||
bool has_initialized = false;
|
bool has_initialized = false;
|
||||||
|
|
||||||
/// Hive table meta
|
/// Hive table meta
|
||||||
@ -123,9 +157,8 @@ private:
|
|||||||
std::shared_ptr<HiveSettings> storage_settings;
|
std::shared_ptr<HiveSettings> storage_settings;
|
||||||
|
|
||||||
Poco::Logger * log = &Poco::Logger::get("StorageHive");
|
Poco::Logger * log = &Poco::Logger::get("StorageHive");
|
||||||
|
|
||||||
void lazyInitialize();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
Reference in New Issue
Block a user