Merge branch 'hive_table' of https://github.com/bigo-sg/ClickHouse into bigo_hive_table

This commit is contained in:
taiyang-li 2021-12-21 15:01:23 +08:00
commit cff3c20742
2 changed files with 52 additions and 12 deletions

View File

@ -85,16 +85,29 @@ public:
return setImpl(key, mapped, lock);
}
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
template <typename LoadFunc>
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{
MappedPtr value = nullptr;
bool is_value_loaded = false, is_value_updated = false;
std::tie(value, is_value_loaded, is_value_updated) = getOrTrySet(key, std::move(load_func));
return std::make_pair(value, is_value_loaded);
}
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
/// produce it, saves the result in the cache and returns it.
/// Only one of several concurrent threads calling getOrSet() will call load_func(),
/// Only one of several concurrent threads calling getOrTrySet() will call load_func(),
/// others will wait for that call to complete and will use its result (this helps prevent cache stampede).
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc.
///
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
/// return std::tuple is <MappedPtr, is_value_loaded, is_value_updated>, where
/// - is_value_loaded indicates whether the value was produce during this call
/// - is_value_updated indicates whether the value is updated in the cache when is_value_loaded = true.
/// if is_value_loaded = false, is_value_updated = false
template <typename LoadFunc>
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
std::tuple<MappedPtr, bool, bool> getOrTrySet(const Key &key, LoadFunc && load_func)
{
InsertTokenHolder token_holder;
{
@ -104,7 +117,7 @@ public:
if (val)
{
++hits;
return std::make_pair(val, false);
return {val, false, false};
}
auto & token = insert_tokens[key];
@ -124,7 +137,7 @@ public:
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
return std::make_pair(token->value, false);
return {token->value, false, false};
}
++misses;
@ -134,20 +147,41 @@ public:
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
bool result = false;
bool is_value_loaded = false;
bool is_value_updated = false;
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
// setImpl() may fail, but the final behavior seems not be affected
// next call of getOrSet() will still call load_func()
setImpl(key, token->value, cache_lock);
result = true;
// next call of getOrTrySet() will still call load_func()
is_value_updated = setImpl(key, token->value, cache_lock);
is_value_loaded = true;
}
if (!token->cleaned_up)
token_holder.cleanup(token_lock, cache_lock);
return {token->value, is_value_loaded, is_value_updated};
}
return std::make_pair(token->value, result);
/// If key is not in cache or the element can be released, return is true. otherwise, return is false
bool tryDel(const Key & key)
{
std::lock_guard loc(mutex);
auto it = cells.find(key);
if (it == cells.end())
return true;
auto & cell = it->second;
if (cell.value)
{
if (!evict_policy.canRelease(*cell.value))
return false;
evict_policy.release(*cell.value);
}
current_size -= cell.size;
cells.erase(it);
queue.erase(cell.queue_iterator);
return true;
}
void getStats(size_t & out_hits, size_t & out_misses) const
@ -384,7 +418,7 @@ private:
current_weight_lost += cell.size;
cells.erase(it);
queue.pop_front();
key_it = queue.erase(key_it);
--queue_size;
}
else

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
{
extern const int NO_HIVEMETASTORE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
bool HiveMetastoreClient::shouldUpdateTableMetadata(
@ -110,7 +111,12 @@ void HiveMetastoreClient::clearTableMetadata(const String & db_name, const Strin
std::lock_guard lock{mutex};
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (metadata)
table_metadata_cache.set(cache_key, nullptr);
{
if (!table_metadata_cache.tryDel(cache_key))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Try to clear table metadata failed.");
}
}
}
void HiveMetastoreClient::setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_)