From 078521496a765fe9a27b39d601e8c8abe6dfd421 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 15:04:26 +0800 Subject: [PATCH] modifications based on pr review --- src/Common/LRUCache.h | 34 +++++------- .../Cache/ExternalDataSourceCache.cpp | 24 ++------- src/Storages/Cache/ExternalDataSourceCache.h | 2 +- src/Storages/Cache/RemoteCacheController.cpp | 54 +++++++++---------- src/Storages/Cache/RemoteCacheController.h | 3 +- src/Storages/Cache/RemoteFileCachePolicy.h | 2 +- src/Storages/Hive/StorageHive.cpp | 7 ++- 7 files changed, 54 insertions(+), 72 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index c503619a7d0..2ca5e007716 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -7,16 +7,10 @@ #include #include #include -#include namespace DB { -namespace ErrorCodes -{ - extern const int CANNOT_RELEASE; -} - template struct TrivialWeightFunction { @@ -27,15 +21,13 @@ struct TrivialWeightFunction }; template -struct TrivialLRUCacheEvitPolicy +struct TrivialLRUCacheEvictPolicy { - // To note that the arg could be null inline bool canRelease(std::shared_ptr) const { return true; } - // To note that the arg could be null inline void release(std::shared_ptr) { } @@ -51,13 +43,21 @@ template , typename WeightFunction = TrivialWeightFunction, - typename EvictPolicy = TrivialLRUCacheEvitPolicy> + typename EvictPolicy = TrivialLRUCacheEvictPolicy> class LRUCache { public: using Key = TKey; using Mapped = TMapped; using MappedPtr = std::shared_ptr; + + struct Result + { + MappedPtr value; + bool cache_miss = true; + // set_successful is not trustworthy for getOrSet, because removeOverflow is called right after putting key in cache + bool set_successful = false; + }; /** Initialize LRUCache with max_size and max_elements_size. * max_elements_size == 0 means no elements size restrictions. @@ -97,12 +97,11 @@ public: return setImpl(key, mapped, lock); } - /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. template std::pair getOrSet(const Key & key, LoadFunc && load_func) { - auto [value, is_loaded, _] = getOrTrySet(key, std::move(load_func)); - return std::make_pair(value, is_loaded); + auto result = getOrTrySet(key, std::move(load_func)); + return std::make_pair(result.value, result.cache_miss); } /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to @@ -112,12 +111,8 @@ public: /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// set of concurrent threads will then try to call its load_func etc. /// - /// return std::tuple is , where - /// - is_value_loaded indicates whether the value was produce during this call - /// - is_value_updated indicates whether the value is updated in the cache when is_value_loaded = true. - /// if is_value_loaded = false, is_value_updated = false template - std::tuple getOrTrySet(const Key &key, LoadFunc && load_func) + Result getOrTrySet(const Key &key, LoadFunc && load_func) { InsertTokenHolder token_holder; { @@ -353,7 +348,6 @@ private: if (inserted) { auto value_weight = mapped ? weight_function(*mapped) : 0; - // move removeOverflow() ahead here. In default, the final result is the same as the old implementation if (!removeOverflow(value_weight)) { // cannot find enough space to put in the new value @@ -375,7 +369,7 @@ private: { if (!evict_policy.canRelease(cell.value)) { - // the old value is refered by someone, cannot release now + // the old value is referred by someone, cannot release now // in default policy, it is always true. return false; } diff --git a/src/Storages/Cache/ExternalDataSourceCache.cpp b/src/Storages/Cache/ExternalDataSourceCache.cpp index 872ce815e58..f8d5c95f859 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.cpp +++ b/src/Storages/Cache/ExternalDataSourceCache.cpp @@ -41,23 +41,8 @@ RemoteReadBuffer::~RemoteReadBuffer() file_cache_controller->deallocFile(std::move(file_buffer)); } -std::unique_ptr RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer) +std::unique_ptr RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer, size_t buff_size) { - auto * log = &Poco::Logger::get("RemoteReadBuffer"); - size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE; - if (read_buffer) - buff_size = read_buffer->internalBuffer().size(); - /* - * in the new implement of ReadBufferFromHDFS, buffer size is 0. - * - * in the common case, we don't read bytes from readbuffer directly, so set buff_size = DBMS_DEFAULT_BUFFER_SIZE - * is OK. - * - * we need be careful with the case without local file reader. - */ - if (buff_size == 0) - buff_size = DBMS_DEFAULT_BUFFER_SIZE; - auto remote_path = remote_file_metadata->remote_path; auto remote_read_buffer = std::make_unique(buff_size); ErrorCodes::ErrorCode error; @@ -65,8 +50,6 @@ std::unique_ptr RemoteReadBuffer::create(ContextPtr context, IRemote std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer); if (remote_read_buffer->file_cache_controller == nullptr) { - LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}.", remote_path, error); - // read_buffer is the input one. return read_buffer; } else @@ -120,7 +103,10 @@ off_t RemoteReadBuffer::getPosition() ExternalDataSourceCache::ExternalDataSourceCache() = default; -ExternalDataSourceCache::~ExternalDataSourceCache() = default; +ExternalDataSourceCache::~ExternalDataSourceCache() +{ + recover_task_holder->deactivate(); +} ExternalDataSourceCache & ExternalDataSourceCache::instance() { diff --git a/src/Storages/Cache/ExternalDataSourceCache.h b/src/Storages/Cache/ExternalDataSourceCache.h index 9f350da65ca..24c1bcd8607 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.h +++ b/src/Storages/Cache/ExternalDataSourceCache.h @@ -34,7 +34,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory public: explicit RemoteReadBuffer(size_t buff_size); ~RemoteReadBuffer() override; - static std::unique_ptr create(ContextPtr contex, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer); + static std::unique_ptr create(ContextPtr contex, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer, size_t buff_size); bool nextImpl() override; off_t seek(off_t off, int whence) override; diff --git a/src/Storages/Cache/RemoteCacheController.cpp b/src/Storages/Cache/RemoteCacheController.cpp index 3f866415a0b..0cee2733bee 100644 --- a/src/Storages/Cache/RemoteCacheController.cpp +++ b/src/Storages/Cache/RemoteCacheController.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,20 +17,6 @@ namespace ErrorCodes extern const int OK; extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; - extern const int END_OF_FILE; -} - -bool RemoteCacheController::loadInnerInformation(const fs::path & file_path) -{ - if (!fs::exists(file_path)) - return false; - std::ifstream info_file(file_path); - Poco::JSON::Parser info_parser; - auto info_json = info_parser.parse(info_file).extract(); - file_status = static_cast(info_json->get("file_status").convert()); - metadata_class = info_json->get("metadata_class").convert(); - info_file.close(); - return true; } std::shared_ptr RemoteCacheController::recover(const std::filesystem::path & local_path_) @@ -37,13 +25,12 @@ std::shared_ptr RemoteCacheController::recover(const std: if (!std::filesystem::exists(local_path_ / "data.bin")) { - LOG_TRACE(log, "Invalid cached directory:{}", local_path_.string()); + LOG_TRACE(log, "Invalid cached directory: {}", local_path_.string()); return nullptr; } auto cache_controller = std::make_shared(nullptr, local_path_, 0); - if (!cache_controller->loadInnerInformation(local_path_ / "info.txt") - || cache_controller->file_status != DOWNLOADED) + if (cache_controller->file_status != DOWNLOADED) { LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string()); return nullptr; @@ -67,12 +54,11 @@ std::shared_ptr RemoteCacheController::recover(const std: local_path_.string()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid metadata class:{}", cache_controller->metadata_class); } - std::ifstream metadata_file(local_path_ / "metadata.txt"); - if (!cache_controller->file_metadata_ptr->fromString(std::string((std::istreambuf_iterator(metadata_file)), - std::istreambuf_iterator()))) + ReadBufferFromFile file_readbuffer((local_path_ / "metadata.txt").string()); + std::string metadata_content; + readStringUntilEOF(metadata_content, file_readbuffer); + if (!cache_controller->file_metadata_ptr->fromString(metadata_content)) { - LOG_ERROR(log, "Cannot load the metadata. The cached file is invalid and will be remove. path:{}", - local_path_.string()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid metadata file({}) for meta class {}", local_path_.string(), cache_controller->metadata_class); } @@ -93,7 +79,7 @@ RemoteCacheController::RemoteCacheController( , current_offset(0) { // on recover, file_metadata_ptr is null, but it will be allocated after loading from metadata.txt - // when we allocate a whole new file cache , file_metadata_ptr must not be null. + // when we allocate a whole new file cache,file_metadata_ptr must not be null. if (file_metadata_ptr) { metadata_class = file_metadata_ptr->getName(); @@ -102,9 +88,22 @@ RemoteCacheController::RemoteCacheController( metadata_file_writer->write(str_buf.c_str(), str_buf.size()); metadata_file_writer->close(); } + else + { + auto info_path = local_path_ / "info.txt"; + if (fs::exists(info_path)) + { + std::ifstream info_file(info_path); + Poco::JSON::Parser info_parser; + auto info_json = info_parser.parse(info_file).extract(); + file_status = static_cast(info_json->get("file_status").convert()); + metadata_class = info_json->get("metadata_class").convert(); + info_file.close(); + } + } } -ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_) +void RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_) { std::unique_lock lock{mutex}; if (file_status == DOWNLOADED) @@ -113,7 +112,7 @@ ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_, if (start_offset_ >= current_offset) { lock.unlock(); - return ErrorCodes::END_OF_FILE; + return; } } else // block until more data is ready @@ -121,18 +120,17 @@ ErrorCodes::ErrorCode RemoteCacheController::waitMoreData(size_t start_offset_, if (current_offset >= end_offset_) { lock.unlock(); - return ErrorCodes::OK; + return; } else more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; }); } lock.unlock(); - return ErrorCodes::OK; } bool RemoteCacheController::isModified(IRemoteFileMetadataPtr file_metadata_) { - return !(file_metadata_ptr->getVersion() == file_metadata_->getVersion()); + return file_metadata_ptr->getVersion() != file_metadata_->getVersion(); } void RemoteCacheController::startBackgroundDownload(std::unique_ptr in_readbuffer_, BackgroundSchedulePool & thread_pool) diff --git a/src/Storages/Cache/RemoteCacheController.h b/src/Storages/Cache/RemoteCacheController.h index 19dd49d5182..c5dc4a510b6 100644 --- a/src/Storages/Cache/RemoteCacheController.h +++ b/src/Storages/Cache/RemoteCacheController.h @@ -58,7 +58,7 @@ public: * enough data be downloaded. * If the file has finished download, the process would unblocked */ - ErrorCodes::ErrorCode waitMoreData(size_t start_offset_, size_t end_offset_); + void waitMoreData(size_t start_offset_, size_t end_offset_); inline size_t size() const { return current_offset; } @@ -85,7 +85,6 @@ public: private: // flush file and status information void flush(bool need_flush_status = false); - bool loadInnerInformation(const std::filesystem::path & file_path); BackgroundSchedulePool::TaskHolder download_task_holder; void backgroundDownload(ReadBufferPtr remote_read_buffer); diff --git a/src/Storages/Cache/RemoteFileCachePolicy.h b/src/Storages/Cache/RemoteFileCachePolicy.h index 4866247ee6e..ebdb2c4bc48 100644 --- a/src/Storages/Cache/RemoteFileCachePolicy.h +++ b/src/Storages/Cache/RemoteFileCachePolicy.h @@ -13,7 +13,7 @@ struct RemoteFileCacheEvictPolicy { bool canRelease(std::shared_ptr cache) const { - return (!cache || cache->closable()); + return !cache || cache->closable(); } void release(std::shared_ptr cache) { diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index f4536c07d93..af357e13ca7 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -161,9 +161,14 @@ public: /// Use local cache for remote storage if enabled. std::unique_ptr remote_read_buf; if (ExternalDataSourceCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_storage) + { + size_t buff_size = raw_read_buf->internalBuffer().size(); + if (buff_size == 0) + buff_size = DBMS_DEFAULT_BUFFER_SIZE; remote_read_buf = RemoteReadBuffer::create(getContext(), std::make_shared("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()), - std::move(raw_read_buf)); + std::move(raw_read_buf), buff_size); + } else remote_read_buf = std::move(raw_read_buf);