modifications based on pr review

This commit is contained in:
lgbo-ustc 2021-12-27 15:04:26 +08:00 committed by liangjiabiao
parent abe79fb1c2
commit 078521496a
7 changed files with 54 additions and 72 deletions

View File

@ -7,16 +7,10 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Common/Exception.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int CANNOT_RELEASE;
}
template <typename T> template <typename T>
struct TrivialWeightFunction struct TrivialWeightFunction
{ {
@ -27,15 +21,13 @@ struct TrivialWeightFunction
}; };
template <typename T> template <typename T>
struct TrivialLRUCacheEvitPolicy struct TrivialLRUCacheEvictPolicy
{ {
// To note that the arg could be null
inline bool canRelease(std::shared_ptr<T>) const inline bool canRelease(std::shared_ptr<T>) const
{ {
return true; return true;
} }
// To note that the arg could be null
inline void release(std::shared_ptr<T>) inline void release(std::shared_ptr<T>)
{ {
} }
@ -51,13 +43,21 @@ template <typename TKey,
typename TMapped, typename TMapped,
typename HashFunction = std::hash<TKey>, typename HashFunction = std::hash<TKey>,
typename WeightFunction = TrivialWeightFunction<TMapped>, typename WeightFunction = TrivialWeightFunction<TMapped>,
typename EvictPolicy = TrivialLRUCacheEvitPolicy<TMapped>> typename EvictPolicy = TrivialLRUCacheEvictPolicy<TMapped>>
class LRUCache class LRUCache
{ {
public: public:
using Key = TKey; using Key = TKey;
using Mapped = TMapped; using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>; using MappedPtr = std::shared_ptr<Mapped>;
struct Result
{
MappedPtr value;
bool cache_miss = true;
// set_successful is not trustworthy for getOrSet, because removeOverflow is called right after putting key in cache
bool set_successful = false;
};
/** Initialize LRUCache with max_size and max_elements_size. /** Initialize LRUCache with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions. * max_elements_size == 0 means no elements size restrictions.
@ -97,12 +97,11 @@ public:
return setImpl(key, mapped, lock); return setImpl(key, mapped, lock);
} }
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
template <typename LoadFunc> template <typename LoadFunc>
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func) std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{ {
auto [value, is_loaded, _] = getOrTrySet(key, std::move(load_func)); auto result = getOrTrySet(key, std::move(load_func));
return std::make_pair(value, is_loaded); return std::make_pair(result.value, result.cache_miss);
} }
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
@ -112,12 +111,8 @@ public:
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc. /// set of concurrent threads will then try to call its load_func etc.
/// ///
/// return std::tuple is <MappedPtr, is_value_loaded, is_value_updated>, where
/// - is_value_loaded indicates whether the value was produce during this call
/// - is_value_updated indicates whether the value is updated in the cache when is_value_loaded = true.
/// if is_value_loaded = false, is_value_updated = false
template <typename LoadFunc> template <typename LoadFunc>
std::tuple<MappedPtr, bool, bool> getOrTrySet(const Key &key, LoadFunc && load_func) Result getOrTrySet(const Key &key, LoadFunc && load_func)
{ {
InsertTokenHolder token_holder; InsertTokenHolder token_holder;
{ {
@ -353,7 +348,6 @@ private:
if (inserted) if (inserted)
{ {
auto value_weight = mapped ? weight_function(*mapped) : 0; auto value_weight = mapped ? weight_function(*mapped) : 0;
// move removeOverflow() ahead here. In default, the final result is the same as the old implementation
if (!removeOverflow(value_weight)) if (!removeOverflow(value_weight))
{ {
// cannot find enough space to put in the new value // cannot find enough space to put in the new value
@ -375,7 +369,7 @@ private:
{ {
if (!evict_policy.canRelease(cell.value)) if (!evict_policy.canRelease(cell.value))
{ {
// the old value is refered by someone, cannot release now // the old value is referred by someone, cannot release now
// in default policy, it is always true. // in default policy, it is always true.
return false; return false;
} }

View File

@ -41,23 +41,8 @@ RemoteReadBuffer::~RemoteReadBuffer()
file_cache_controller->deallocFile(std::move(file_buffer)); file_cache_controller->deallocFile(std::move(file_buffer));
} }
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer) std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size)
{ {
auto * log = &Poco::Logger::get("RemoteReadBuffer");
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
if (read_buffer)
buff_size = read_buffer->internalBuffer().size();
/*
* in the new implement of ReadBufferFromHDFS, buffer size is 0.
*
* in the common case, we don't read bytes from readbuffer directly, so set buff_size = DBMS_DEFAULT_BUFFER_SIZE
* is OK.
*
* we need be careful with the case without local file reader.
*/
if (buff_size == 0)
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
auto remote_path = remote_file_metadata->remote_path; auto remote_path = remote_file_metadata->remote_path;
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size); auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
ErrorCodes::ErrorCode error; ErrorCodes::ErrorCode error;
@ -65,8 +50,6 @@ std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, IRemote
std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer); std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer);
if (remote_read_buffer->file_cache_controller == nullptr) if (remote_read_buffer->file_cache_controller == nullptr)
{ {
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}.", remote_path, error);
// read_buffer is the input one.
return read_buffer; return read_buffer;
} }
else else
@ -120,7 +103,10 @@ off_t RemoteReadBuffer::getPosition()
ExternalDataSourceCache::ExternalDataSourceCache() = default; ExternalDataSourceCache::ExternalDataSourceCache() = default;
ExternalDataSourceCache::~ExternalDataSourceCache() = default; ExternalDataSourceCache::~ExternalDataSourceCache()
{
recover_task_holder->deactivate();
}
ExternalDataSourceCache & ExternalDataSourceCache::instance() ExternalDataSourceCache & ExternalDataSourceCache::instance()
{ {

View File

@ -34,7 +34,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
public: public:
explicit RemoteReadBuffer(size_t buff_size); explicit RemoteReadBuffer(size_t buff_size);
~RemoteReadBuffer() override; ~RemoteReadBuffer() override;
static std::unique_ptr<ReadBuffer> create(ContextPtr contex, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer); static std::unique_ptr<ReadBuffer> create(ContextPtr contex, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size);
bool nextImpl() override; bool nextImpl() override;
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;

View File

@ -1,6 +1,8 @@
#include <Storages/Cache/RemoteCacheController.h> #include <Storages/Cache/RemoteCacheController.h>
#include <Storages/Cache/ExternalDataSourceCache.h> #include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/RemoteFileMetadataFactory.h> #include <Storages/Cache/RemoteFileMetadataFactory.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Poco/JSON/JSON.h> #include <Poco/JSON/JSON.h>
@ -15,20 +17,6 @@ namespace ErrorCodes
extern const int OK; extern const int OK;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int END_OF_FILE;
}
bool RemoteCacheController::loadInnerInformation(const fs::path & file_path)
{
if (!fs::exists(file_path))
return false;
std::ifstream info_file(file_path);
Poco::JSON::Parser info_parser;
auto info_json = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
file_status = static_cast<LocalFileStatus>(info_json->get("file_status").convert<Int32>());
metadata_class = info_json->get("metadata_class").convert<String>();
info_file.close();
return true;
} }
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_) std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_)
@ -37,13 +25,12 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
if (!std::filesystem::exists(local_path_ / "data.bin")) if (!std::filesystem::exists(local_path_ / "data.bin"))
{ {
LOG_TRACE(log, "Invalid cached directory:{}", local_path_.string()); LOG_TRACE(log, "Invalid cached directory: {}", local_path_.string());
return nullptr; return nullptr;
} }
auto cache_controller = std::make_shared<RemoteCacheController>(nullptr, local_path_, 0); auto cache_controller = std::make_shared<RemoteCacheController>(nullptr, local_path_, 0);
if (!cache_controller->loadInnerInformation(local_path_ / "info.txt") if (cache_controller->file_status != DOWNLOADED)
|| cache_controller->file_status != DOWNLOADED)
{ {
LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string()); LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string());
return nullptr; return nullptr;
@ -67,12 +54,11 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
local_path_.string()); local_path_.string());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid metadata class:{}", cache_controller->metadata_class); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid metadata class:{}", cache_controller->metadata_class);
} }
std::ifstream metadata_file(local_path_ / "metadata.txt"); ReadBufferFromFile file_readbuffer((local_path_ / "metadata.txt").string());
if (!cache_controller->file_metadata_ptr->fromString(std::string((std::istreambuf_iterator<char>(metadata_file)), std::string metadata_content;
std::istreambuf_iterator<char>()))) readStringUntilEOF(metadata_content, file_readbuffer);
if (!cache_controller->file_metadata_ptr->fromString(metadata_content))
{ {
LOG_ERROR(log, "Cannot load the metadata. The cached file is invalid and will be remove. path:{}",
local_path_.string());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid metadata file({}) for meta class {}", local_path_.string(), cache_controller->metadata_class); throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid metadata file({}) for meta class {}", local_path_.string(), cache_controller->metadata_class);
} }
@ -93,7 +79,7 @@ RemoteCacheController::RemoteCacheController(
, current_offset(0) , current_offset(0)
{ {
// on recover, file_metadata_ptr is null, but it will be allocated after loading from metadata.txt // on recover, file_metadata_ptr is null, but it will be allocated after loading from metadata.txt
// when we allocate a whole new file cache file_metadata_ptr must not be null. // when we allocate a whole new file cachefile_metadata_ptr must not be null.
if (file_metadata_ptr) if (file_metadata_ptr)
{ {
metadata_class = file_metadata_ptr->getName(); metadata_class = file_metadata_ptr->getName();
@ -102,9 +88,22 @@ RemoteCacheController::RemoteCacheController(
metadata_file_writer->write(str_buf.c_str(), str_buf.size()); metadata_file_writer->write(str_buf.c_str(), str_buf.size());
metadata_file_writer->close(); metadata_file_writer->close();
} }
else
{
auto info_path = local_path_ / "info.txt";
if (fs::exists(info_path))
{
std::ifstream info_file(info_path);
Poco::JSON::Parser info_parser;
auto info_json = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
file_status = static_cast<LocalFileStatus>(info_json->get("file_status").convert<Int32>());
metadata_class = info_json->get("metadata_class").convert<String>();
info_file.close();
}
}
} }
ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_) void RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
if (file_status == DOWNLOADED) if (file_status == DOWNLOADED)
@ -113,7 +112,7 @@ ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_,
if (start_offset_ >= current_offset) if (start_offset_ >= current_offset)
{ {
lock.unlock(); lock.unlock();
return ErrorCodes::END_OF_FILE; return;
} }
} }
else // block until more data is ready else // block until more data is ready
@ -121,18 +120,17 @@ ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_,
if (current_offset >= end_offset_) if (current_offset >= end_offset_)
{ {
lock.unlock(); lock.unlock();
return ErrorCodes::OK; return;
} }
else else
more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; }); more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; });
} }
lock.unlock(); lock.unlock();
return ErrorCodes::OK;
} }
bool RemoteCacheController::isModified(IRemoteFileMetadataPtr file_metadata_) bool RemoteCacheController::isModified(IRemoteFileMetadataPtr file_metadata_)
{ {
return !(file_metadata_ptr->getVersion() == file_metadata_->getVersion()); return file_metadata_ptr->getVersion() != file_metadata_->getVersion();
} }
void RemoteCacheController::startBackgroundDownload(std::unique_ptr<ReadBuffer> in_readbuffer_, BackgroundSchedulePool & thread_pool) void RemoteCacheController::startBackgroundDownload(std::unique_ptr<ReadBuffer> in_readbuffer_, BackgroundSchedulePool & thread_pool)

View File

@ -58,7 +58,7 @@ public:
* enough data be downloaded. * enough data be downloaded.
* If the file has finished download, the process would unblocked * If the file has finished download, the process would unblocked
*/ */
ErrorCodes::ErrorCode waitMoreData(size_t start_offset_, size_t end_offset_); void waitMoreData(size_t start_offset_, size_t end_offset_);
inline size_t size() const { return current_offset; } inline size_t size() const { return current_offset; }
@ -85,7 +85,6 @@ public:
private: private:
// flush file and status information // flush file and status information
void flush(bool need_flush_status = false); void flush(bool need_flush_status = false);
bool loadInnerInformation(const std::filesystem::path & file_path);
BackgroundSchedulePool::TaskHolder download_task_holder; BackgroundSchedulePool::TaskHolder download_task_holder;
void backgroundDownload(ReadBufferPtr remote_read_buffer); void backgroundDownload(ReadBufferPtr remote_read_buffer);

View File

@ -13,7 +13,7 @@ struct RemoteFileCacheEvictPolicy
{ {
bool canRelease(std::shared_ptr<RemoteCacheController> cache) const bool canRelease(std::shared_ptr<RemoteCacheController> cache) const
{ {
return (!cache || cache->closable()); return !cache || cache->closable();
} }
void release(std::shared_ptr<RemoteCacheController> cache) void release(std::shared_ptr<RemoteCacheController> cache)
{ {

View File

@ -161,9 +161,14 @@ public:
/// Use local cache for remote storage if enabled. /// Use local cache for remote storage if enabled.
std::unique_ptr<ReadBuffer> remote_read_buf; std::unique_ptr<ReadBuffer> remote_read_buf;
if (ExternalDataSourceCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_storage) if (ExternalDataSourceCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_storage)
{
size_t buff_size = raw_read_buf->internalBuffer().size();
if (buff_size == 0)
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
remote_read_buf = RemoteReadBuffer::create(getContext(), remote_read_buf = RemoteReadBuffer::create(getContext(),
std::make_shared<StorageHiveMetadata>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()), std::make_shared<StorageHiveMetadata>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
std::move(raw_read_buf)); std::move(raw_read_buf), buff_size);
}
else else
remote_read_buf = std::move(raw_read_buf); remote_read_buf = std::move(raw_read_buf);