Use connection pool in HiveMetastoreClient

1. remove lock for hive metastore client access
2. auo reconnect when connection is broken
This commit is contained in:
lgbo-ustc 2022-02-28 15:11:38 +08:00 committed by liangjiabiao
parent db69ab9d17
commit 2176d74cd1
2 changed files with 130 additions and 67 deletions

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Storages/Hive/HiveCommon.h>
#if USE_HIVE
@ -5,6 +6,7 @@
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <Storages/Hive/HiveFile.h>
namespace DB
@ -15,6 +17,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
const unsigned ThriftHiveMetastoreClientPool::max_connections = 16;
bool HiveMetastoreClient::shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
{
@ -40,25 +44,42 @@ bool HiveMetastoreClient::shouldUpdateTableMetadata(
return false;
}
void HiveMetastoreClient::tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func)
{
int i = 0;
String err_msg;
for (; i < max_retry; ++i)
{
auto client = client_pool.get(get_client_timeout);
try
{
func(client);
}
catch (apache::thrift::transport::TTransportException & e)
{
client.expire();
err_msg = e.what();
continue;
}
break;
}
if (i >= max_retry)
throw Exception(ErrorCodes::NO_HIVEMETASTORE, "Hive Metastore expired because {}", err_msg);
}
HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(const String & db_name, const String & table_name)
{
LOG_TRACE(log, "Get table metadata for {}.{}", db_name, table_name);
std::lock_guard lock{mutex};
auto table = std::make_shared<Apache::Hadoop::Hive::Table>();
std::vector<Apache::Hadoop::Hive::Partition> partitions;
try
auto client_call = [&](ThriftHiveMetastoreClientPool::Entry & client)
{
client->get_table(*table, db_name, table_name);
/// Query the latest partition info to check new change.
client->get_partitions(partitions, db_name, table_name, -1);
}
catch (apache::thrift::transport::TTransportException & e)
{
setExpired();
throw Exception(ErrorCodes::NO_HIVEMETASTORE, "Hive Metastore expired because {}", String(e.what()));
}
};
tryCallHiveClient(client_call);
bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
String cache_key = getCacheKey(db_name, table_name);
@ -103,23 +124,26 @@ HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(
return metadata;
}
std::shared_ptr<Apache::Hadoop::Hive::Table> HiveMetastoreClient::getHiveTable(const String & db_name, const String & table_name)
{
auto table = std::make_shared<Apache::Hadoop::Hive::Table>();
auto client_call = [&](ThriftHiveMetastoreClientPool::Entry & client)
{
client->get_table(*table, db_name, table_name);
};
tryCallHiveClient(client_call);
return table;
}
void HiveMetastoreClient::clearTableMetadata(const String & db_name, const String & table_name)
{
String cache_key = getCacheKey(db_name, table_name);
std::lock_guard lock{mutex};
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (metadata)
table_metadata_cache.remove(cache_key);
}
void HiveMetastoreClient::setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_)
{
std::lock_guard lock{mutex};
client = client_;
clearExpired();
}
bool HiveMetastoreClient::PartitionInfo::haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const
{
/// Parameters include keys:numRows,numFiles,rawDataSize,totalSize,transient_lastDdlTime
@ -192,53 +216,55 @@ HiveMetastoreClientFactory & HiveMetastoreClientFactory::instance()
return factory;
}
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace Apache::Hadoop::Hive;
HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & name, ContextPtr context)
{
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace Apache::Hadoop::Hive;
std::lock_guard lock(mutex);
auto it = clients.find(name);
if (it == clients.end() || it->second->isExpired())
if (it == clients.end())
{
/// Connect to hive metastore
Poco::URI hive_metastore_url(name);
const auto & host = hive_metastore_url.getHost();
auto port = hive_metastore_url.getPort();
std::shared_ptr<TSocket> socket = std::make_shared<TSocket>(host, port);
socket->setKeepAlive(true);
socket->setConnTimeout(conn_timeout_ms);
socket->setRecvTimeout(recv_timeout_ms);
socket->setSendTimeout(send_timeout_ms);
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
std::shared_ptr<ThriftHiveMetastoreClient> thrift_client = std::make_shared<ThriftHiveMetastoreClient>(protocol);
try
auto builder = [name]()
{
transport->open();
}
catch (TException & tx)
{
throw Exception("connect to hive metastore:" + name + " failed." + tx.what(), ErrorCodes::BAD_ARGUMENTS);
}
if (it == clients.end())
{
HiveMetastoreClientPtr client = std::make_shared<HiveMetastoreClient>(std::move(thrift_client), context);
clients[name] = client;
return client;
}
else
{
it->second->setClient(std::move(thrift_client));
return it->second;
}
return createThriftHiveMetastoreClient(name);
};
auto client = std::make_shared<HiveMetastoreClient>(builder, context->getGlobalContext());
clients[name] = client;
return client;
}
return it->second;
}
const int HiveMetastoreClientFactory::conn_timeout_ms = 10000;
const int HiveMetastoreClientFactory::recv_timeout_ms = 10000;
const int HiveMetastoreClientFactory::send_timeout_ms = 10000;
std::shared_ptr<ThriftHiveMetastoreClient> HiveMetastoreClientFactory::createThriftHiveMetastoreClient(const String &name)
{
Poco::URI hive_metastore_url(name);
const auto & host = hive_metastore_url.getHost();
auto port = hive_metastore_url.getPort();
std::shared_ptr<TSocket> socket = std::make_shared<TSocket>(host, port);
socket->setKeepAlive(true);
socket->setConnTimeout(conn_timeout_ms);
socket->setRecvTimeout(recv_timeout_ms);
socket->setSendTimeout(send_timeout_ms);
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
std::shared_ptr<ThriftHiveMetastoreClient> thrift_client = std::make_shared<ThriftHiveMetastoreClient>(protocol);
try
{
transport->open();
}
catch (TException & tx)
{
throw Exception("connect to hive metastore:" + name + " failed." + tx.what(), ErrorCodes::BAD_ARGUMENTS);
}
return thrift_client;
}
}
#endif

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <Common/config.h>
#if USE_HIVE
@ -10,12 +11,40 @@
#include <base/types.h>
#include <Common/LRUCache.h>
#include <Common/PoolBase.h>
#include <Storages/HDFS/HDFSCommon.h>
namespace DB
{
using ThriftHiveMetastoreClientBuilder = std::function<std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>()>;
class ThriftHiveMetastoreClientPool : public PoolBase<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>
{
public:
using Object = Apache::Hadoop::Hive::ThriftHiveMetastoreClient;
using ObjectPtr = std::shared_ptr<Object>;
using Entry = PoolBase<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>::Entry;
explicit ThriftHiveMetastoreClientPool(ThriftHiveMetastoreClientBuilder builder_)
: PoolBase<Object>(max_connections, &Poco::Logger::get("ThriftHiveMetastoreClientPool"))
, builder(builder_)
{
}
protected:
ObjectPtr allocObject() override
{
return builder();
}
private:
ThriftHiveMetastoreClientBuilder builder;
const static unsigned max_connections;
};
class HiveMetastoreClient : public WithContext
{
public:
@ -26,7 +55,9 @@ public:
UInt64 last_modify_time; /// In ms
size_t size;
FileInfo() = default;
explicit FileInfo() = default;
FileInfo & operator = (const FileInfo &) = default;
FileInfo(const FileInfo &) = default;
FileInfo(const String & path_, UInt64 last_modify_time_, size_t size_)
: path(path_), last_modify_time(last_modify_time_), size(size_)
{
@ -94,17 +125,18 @@ public:
using HiveTableMetadataPtr = std::shared_ptr<HiveMetastoreClient::HiveTableMetadata>;
explicit HiveMetastoreClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_, ContextPtr context_)
: WithContext(context_), client(client_), table_metadata_cache(1000)
explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_, ContextPtr context_)
: WithContext(context_)
, table_metadata_cache(1000)
, client_pool(builder_)
{
}
HiveTableMetadataPtr getTableMetadata(const String & db_name, const String & table_name);
// Access hive table information by hive client
std::shared_ptr<Apache::Hadoop::Hive::Table> getHiveTable(const String & db_name, const String & table_name);
void clearTableMetadata(const String & db_name, const String & table_name);
void setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_);
bool isExpired() const { return expired; }
void setExpired() { expired = true; }
void clearExpired() { expired = false; }
private:
static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; }
@ -112,12 +144,15 @@ private:
bool shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client;
void tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func);
LRUCache<String, HiveTableMetadata> table_metadata_cache;
mutable std::mutex mutex;
std::atomic<bool> expired{false};
ThriftHiveMetastoreClientPool client_pool;
Poco::Logger * log = &Poco::Logger::get("HiveMetastoreClient");
const int max_retry = 3;
const UInt64 get_client_timeout = 1000000;
};
using HiveMetastoreClientPtr = std::shared_ptr<HiveMetastoreClient>;
@ -128,13 +163,15 @@ public:
HiveMetastoreClientPtr getOrCreate(const String & name, ContextPtr context);
static std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> createThriftHiveMetastoreClient(const String & name);
private:
std::mutex mutex;
std::map<String, HiveMetastoreClientPtr> clients;
const int conn_timeout_ms = 10000;
const int recv_timeout_ms = 10000;
const int send_timeout_ms = 10000;
const static int conn_timeout_ms;
const static int recv_timeout_ms;
const static int send_timeout_ms;
};
}