refactor HiveTableMetadata

This commit is contained in:
taiyang-li 2022-04-08 23:04:24 +08:00
parent 2e6f0db825
commit 2c99ef0ecc
3 changed files with 94 additions and 92 deletions

View File

@ -29,31 +29,6 @@ ThriftHiveMetastoreClientPool::ThriftHiveMetastoreClientPool(ThriftHiveMetastore
{
}
bool HiveMetastoreClient::shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
{
String cache_key = getCacheKey(db_name, table_name);
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (!metadata)
return true;
auto old_partiton_infos = metadata->getPartitionInfos();
if (old_partiton_infos.size() != partitions.size())
return true;
for (const auto & partition : partitions)
{
auto it = old_partiton_infos.find(partition.sd.location);
if (it == old_partiton_infos.end())
return true;
const auto & old_partition_info = it->second;
if (!old_partition_info.haveSameParameters(partition))
return true;
}
return false;
}
void HiveMetastoreClient::tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func)
{
int i = 0;
@ -91,44 +66,17 @@ HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(
};
tryCallHiveClient(client_call);
bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
// bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
String cache_key = getCacheKey(db_name, table_name);
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (update_cache)
{
LOG_INFO(log, "Reload hive partition metadata info for {}.{}", db_name, table_name);
/// Generate partition infos from partitions and old partition infos(if exists).
std::map<String, PartitionInfo> new_partition_infos;
if (metadata)
{
auto & old_partiton_infos = metadata->getPartitionInfos();
for (const auto & partition : partitions)
{
auto it = old_partiton_infos.find(partition.sd.location);
if (it == old_partiton_infos.end() || !it->second.haveSameParameters(partition) || !it->second.initialized)
{
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
continue;
metadata->updateIfNeeded(partitions);
}
else
{
PartitionInfo new_partition_info(partition);
new_partition_info.files = std::move(it->second.files);
new_partition_info.initialized = true;
}
}
}
else
{
for (const auto & partition : partitions)
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
}
metadata = std::make_shared<HiveMetastoreClient::HiveTableMetadata>(
db_name, table_name, table, std::move(new_partition_infos));
metadata = std::make_shared<HiveTableMetadata>(db_name, table_name, table, partitions);
table_metadata_cache.set(cache_key, metadata);
}
return metadata;
@ -157,14 +105,14 @@ void HiveMetastoreClient::clearTableMetadata(const String & db_name, const Strin
bool HiveMetastoreClient::PartitionInfo::haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const
{
/// Parameters include keys:numRows,numFiles,rawDataSize,totalSize,transient_lastDdlTime
auto it1 = partition.parameters.cbegin();
auto it2 = other.parameters.cbegin();
for (; it1 != partition.parameters.cend() && it2 != other.parameters.cend(); ++it1, ++it2)
auto it = partition.parameters.cbegin();
auto oit = other.parameters.cbegin();
for (; it != partition.parameters.cend() && oit != other.parameters.cend(); ++it, ++oit)
{
if (it1->first != it2->first || it1->second != it2->second)
if (it->first != oit->first || it->second != oit->second)
return false;
}
return (it1 == partition.parameters.cend() && it2 == other.parameters.cend());
return (it == partition.parameters.cend() && oit == other.parameters.cend());
}
std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMetadata::getPartitions() const
@ -221,6 +169,57 @@ std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadat
return result;
}
HiveFilesCachePtr HiveMetastoreClient::HiveTableMetadata::getHiveFilesCache() const
{
return hive_files_cache;
}
void HiveMetastoreClient::HiveTableMetadata::updateIfNeeded(const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
{
std::lock_guard lock{mutex};
if (!shouldUpdate(partitions))
return;
std::map<String, PartitionInfo> new_partition_infos;
auto & old_partiton_infos = partition_infos;
for (const auto & partition : partitions)
{
auto it = old_partiton_infos.find(partition.sd.location);
if (it == old_partiton_infos.end() || !it->second.haveSameParameters(partition) || !it->second.initialized)
{
new_partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
continue;
}
else
{
new_partition_infos.emplace(partition.sd.location, std::move(it->second));
}
}
partition_infos.swap(new_partition_infos);
}
bool HiveMetastoreClient::HiveTableMetadata::shouldUpdate(const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
{
const auto & old_partiton_infos = partition_infos;
if (old_partiton_infos.size() != partitions.size())
return false;
for (const auto & partition : partitions)
{
auto it = old_partiton_infos.find(partition.sd.location);
if (it == old_partiton_infos.end())
return true;
const auto & old_partition_info = it->second;
if (!old_partition_info.haveSameParameters(partition))
return true;
}
return false;
}
HiveMetastoreClientFactory & HiveMetastoreClientFactory::instance()
{
static HiveMetastoreClientFactory factory;
@ -234,7 +233,6 @@ using namespace Apache::Hadoop::Hive;
HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & name)
{
std::lock_guard lock(mutex);
auto it = clients.find(name);
if (it == clients.end())
@ -244,11 +242,12 @@ HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & na
return createThriftHiveMetastoreClient(name);
};
auto client = std::make_shared<HiveMetastoreClient>(builder);
clients[name] = client;
clients.emplace(name, client);
return client;
}
return it->second;
}
std::shared_ptr<ThriftHiveMetastoreClient> HiveMetastoreClientFactory::createThriftHiveMetastoreClient(const String &name)
{
Poco::URI hive_metastore_url(name);

View File

@ -13,6 +13,7 @@
#include <Common/LRUCache.h>
#include <Common/PoolBase.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/Hive/HiveFile.h>
namespace DB
@ -40,7 +41,6 @@ private:
class HiveMetastoreClient
{
public:
struct FileInfo
{
String path;
@ -63,57 +63,61 @@ public:
bool initialized = false; /// If true, files are initialized.
explicit PartitionInfo(const Apache::Hadoop::Hive::Partition & partition_): partition(partition_) {}
PartitionInfo(PartitionInfo &&) = default;
bool haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const;
};
class HiveTableMetadata;
using HiveTableMetadataPtr = std::shared_ptr<HiveTableMetadata>;
/// Used for speeding up metadata query process.
struct HiveTableMetadata
class HiveTableMetadata : boost::noncopyable
{
public:
HiveTableMetadata(
const String & db_name_,
const String & table_name_,
std::shared_ptr<Apache::Hadoop::Hive::Table> table_,
const std::map<String, PartitionInfo> & partition_infos_)
const std::vector<Apache::Hadoop::Hive::Partition> & partitions_)
: db_name(db_name_)
, table_name(table_name_)
, table(table_)
, partition_infos(partition_infos_)
, table(std::move(table_))
, empty_partition_keys(table->partitionKeys.empty())
, hive_files_cache(std::make_shared<HiveFilesCache>(10000))
{
std::lock_guard lock(mutex);
for (const auto & partition : partitions_)
partition_infos.emplace(partition.sd.location, PartitionInfo(partition));
}
std::map<String, PartitionInfo> & getPartitionInfos()
{
std::lock_guard lock{mutex};
return partition_infos;
}
std::shared_ptr<Apache::Hadoop::Hive::Table> getTable() const
{
std::lock_guard lock{mutex};
return table;
}
std::shared_ptr<Apache::Hadoop::Hive::Table> getTable() const { return table; }
std::vector<Apache::Hadoop::Hive::Partition> getPartitions() const;
std::vector<FileInfo> getFilesByLocation(const HDFSFSPtr & fs, const String & location);
private:
String db_name;
String table_name;
HiveFilesCachePtr getHiveFilesCache() const;
void updateIfNeeded(const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
private:
bool shouldUpdate(const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
const String db_name;
const String table_name;
const std::shared_ptr<Apache::Hadoop::Hive::Table> table;
/// Mutex to protect partition_infos.
mutable std::mutex mutex;
std::shared_ptr<Apache::Hadoop::Hive::Table> table;
std::map<String, PartitionInfo> partition_infos;
const bool empty_partition_keys;
const HiveFilesCachePtr hive_files_cache;
Poco::Logger * log = &Poco::Logger::get("HiveMetastoreClient");
};
using HiveTableMetadataPtr = std::shared_ptr<HiveMetastoreClient::HiveTableMetadata>;
explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_)
: table_metadata_cache(1000)
@ -129,9 +133,6 @@ public:
private:
static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; }
bool shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
void tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func);
LRUCache<String, HiveTableMetadata> table_metadata_cache;
@ -148,9 +149,9 @@ public:
HiveMetastoreClientPtr getOrCreate(const String & name);
private:
static std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> createThriftHiveMetastoreClient(const String & name);
private:
std::mutex mutex;
std::map<String, HiveMetastoreClientPtr> clients;
};

View File

@ -166,6 +166,8 @@ protected:
using HiveFilePtr = std::shared_ptr<IHiveFile>;
using HiveFiles = std::vector<HiveFilePtr>;
using HiveFilesCache = LRUCache<String, HiveFilePtr>;
using HiveFilesCachePtr = std::shared_ptr<HiveFilesCache>;
class HiveTextFile : public IHiveFile
{