diff --git a/src/IO/RemoteFileMetaDataBase.cpp b/src/IO/RemoteFileMetaDataBase.cpp new file mode 100644 index 00000000000..e041bdce29a --- /dev/null +++ b/src/IO/RemoteFileMetaDataBase.cpp @@ -0,0 +1,35 @@ +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +RemoteFileMetaDataBase::~RemoteFileMetaDataBase() {} + +RemoteFileMetaDataFactory & RemoteFileMetaDataFactory::instance() +{ + static RemoteFileMetaDataFactory g_factory; + return g_factory; +} + +RemoteFileMetaDataBasePtr RemoteFileMetaDataFactory::create_class(const String & class_name) +{ + auto it = class_creators.find(class_name); + if (it == class_creators.end()) + return nullptr; + return (it->second)(); +} + +void RemoteFileMetaDataFactory::register_class(const String & class_name, ClassCreator creator) +{ + auto it = class_creators.find(class_name); + if (it != class_creators.end()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Class ({}) has been registered. It is a fatal error.", class_name); + } + class_creators[class_name] = creator; +} +} diff --git a/src/IO/RemoteFileMetaDataBase.h b/src/IO/RemoteFileMetaDataBase.h new file mode 100644 index 00000000000..d2acbd0af96 --- /dev/null +++ b/src/IO/RemoteFileMetaDataBase.h @@ -0,0 +1,87 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace DB +{ +class RemoteFileMetaDataBase +{ +public: + RemoteFileMetaDataBase() = default; + RemoteFileMetaDataBase(const String & schema_, + const String & cluster_, + const String & remote_path_, + size_t file_size_, + UInt64 last_modification_timestamp_): + schema(schema_) + ,cluster(cluster_) + ,remote_path(remote_path_) + ,file_size(file_size_) + ,last_modification_timestamp(last_modification_timestamp_) + { + } + virtual ~RemoteFileMetaDataBase(); + virtual String getClassName() = 0; //class name + // methods for basic information + inline String getSchema() { return schema; } + inline String getCluster() { return cluster; } + inline size_t getFileSize() { return file_size; } + inline String getRemotePath() { return remote_path; } + inline UInt64 getLastModificationTimestamp() { return last_modification_timestamp; } + // create a new object + virtual std::shared_ptr clone() = 0; + + // deserialize + virtual bool fromString(const String &buf) = 0; + // serialize + virtual String toString() = 0; + // to compare two meta datas for detecting file changes + virtual bool equal(std::shared_ptr b) = 0; +protected: + String schema; + String cluster; + String remote_path; + size_t file_size = 0; + UInt64 last_modification_timestamp = 0; +}; + +using RemoteFileMetaDataBasePtr = std::shared_ptr; + +/* + * How to register a subclass into factory and use it ? + * 1) define your own subclass derive from RemoteFileMetaDataBase. Notice! the getClassName() must be the same + as your subclass name. + * 2) in a .cpp file, call REGISTTER_REMOTE_FILE_META_DATA_CLASS(subclass). + 3) call RemoteFileMetaDataFactory::instance().create_class(subclass_name) where you want to make a new object + */ + +class RemoteFileMetaDataFactory : private boost::noncopyable +{ +public: + using ClassCreator = std::function; + ~RemoteFileMetaDataFactory() = default; + + static RemoteFileMetaDataFactory & instance(); + RemoteFileMetaDataBasePtr create_class(const String & class_name); + void register_class(const String &class_name, ClassCreator creator); +protected: + RemoteFileMetaDataFactory() = default; + +private: + std::unordered_map class_creators; +}; + +// this should be used in a .cpp file. All the subclasses will finish the registeration before the main() +#define REGISTTER_REMOTE_FILE_META_DATA_CLASS(meta_data_class) \ + class FileMetaDataFactory##meta_data_class{\ + public:\ + FileMetaDataFactory##meta_data_class(){\ + auto creator = []() -> RemoteFileMetaDataBasePtr { return std::make_shared(); };\ + RemoteFileMetaDataFactory::instance().register_class(#meta_data_class, creator);\ + }\ + };\ + static FileMetaDataFactory##meta_data_class g_file_meta_data_factory_instance##meta_data_class; +} diff --git a/src/IO/RemoteReadBufferCache.cpp b/src/IO/RemoteReadBufferCache.cpp index 33c3fd0dfa7..3a482e90bad 100644 --- a/src/IO/RemoteReadBufferCache.cpp +++ b/src/IO/RemoteReadBufferCache.cpp @@ -26,104 +26,85 @@ namespace ErrorCodes extern const int CANNOT_CREATE_DIRECTORY; } -bool RemoteFileMetadata::load(const std::filesystem::path & local_path) +bool RemoteCacheController::loadInnerInformation(const fs::path & file_path) { - auto * log = &Poco::Logger::get("RemoteFileMetadata"); - if (!std::filesystem::exists(local_path)) - { - LOG_ERROR(log, "file path not exists:{}", local_path.string()); + if (!fs::exists(file_path)) return false; - } - std::ifstream meta_fs(local_path.string()); - Poco::JSON::Parser meta_data_parser; - auto meta_data_jobj = meta_data_parser.parse(meta_fs).extract(); - remote_path = meta_data_jobj->get("remote_path").convert(); - schema = meta_data_jobj->get("schema").convert(); - cluster = meta_data_jobj->get("cluster").convert(); - status = static_cast(meta_data_jobj->get("status").convert()); - last_modification_timestamp = meta_data_jobj->get("last_modification_timestamp").convert(); - file_size = meta_data_jobj->get("file_size").convert(); - meta_fs.close(); - + std::ifstream info_file(file_path); + Poco::JSON::Parser info_parser; + auto info_jobj = info_parser.parse(info_file).extract(); + file_status = static_cast(info_jobj->get("file_status").convert()); + meta_data_class = info_jobj->get("meta_data_class").convert(); + info_file.close(); return true; } -void RemoteFileMetadata::save(const std::filesystem::path & local_path) const -{ - std::ofstream meta_file(local_path.string(), std::ios::out); - meta_file << toString(); - meta_file.close(); -} - -String RemoteFileMetadata::toString() const -{ - Poco::JSON::Object jobj; - jobj.set("schema", schema); - jobj.set("cluster", cluster); - jobj.set("remote_path", remote_path); - jobj.set("status", static_cast(status)); - jobj.set("last_modification_timestamp", last_modification_timestamp); - jobj.set("file_size", file_size); - std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM - jobj.stringify(buf); - return buf.str(); -} - std::shared_ptr RemoteCacheController::recover(const std::filesystem::path & local_path_) { auto * log = &Poco::Logger::get("RemoteCacheController"); - if (!std::filesystem::exists(local_path_) || !std::filesystem::exists(local_path_ / "data.bin")) + if (!std::filesystem::exists(local_path_ / "data.bin")) { LOG_TRACE(log, "Invalid cached directory:{}", local_path_.string()); return nullptr; } - RemoteFileMetadata remote_file_meta_data; - if (!remote_file_meta_data.load(local_path_ / "meta.txt") || remote_file_meta_data.status != RemoteFileMetadata::DOWNLOADED) + auto cache_controller = std::make_shared(nullptr, local_path_, 0); + if (!cache_controller->loadInnerInformation(local_path_ / "info.txt") + || cache_controller->file_status != DOWNLOADED) { - LOG_INFO(log, "recover cached file failed. local path:{}, file meta data:{}", local_path_.string(), remote_file_meta_data.toString()); + LOG_INFO(log, "recover cached file failed. local path:{}", local_path_.string()); return nullptr; } - auto cache_controller = std::make_shared(nullptr, remote_file_meta_data, local_path_, 0, nullptr); - cache_controller->current_offset = remote_file_meta_data.file_size; + cache_controller->file_meta_data_ptr = RemoteFileMetaDataFactory::instance().create_class(cache_controller->meta_data_class); + if (!cache_controller->file_meta_data_ptr) + { + // do not load this invalid cached file and clear it + LOG_ERROR(log, "Cannot create the meta data class : {}. The cached file is invalid and will be remove. path:{}", + cache_controller->meta_data_class, + local_path_.string()); + fs::remove_all(local_path_); + return nullptr; + } + std::ifstream meta_data_file(local_path_ / "meta_data.txt"); + if (!cache_controller->file_meta_data_ptr->fromString(std::string((std::istreambuf_iterator(meta_data_file)), + std::istreambuf_iterator()))) + { + LOG_ERROR(log, "Cannot load the meta data. The cached file is invalid and will be remove. path:{}", + local_path_.string()); + fs::remove_all(local_path_); + return nullptr; + } - RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data.file_size); + cache_controller->current_offset = fs::file_size(local_path_ / "data.bin"); + + RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data_ptr->getFileSize()); return cache_controller; } RemoteCacheController::RemoteCacheController( - ContextPtr context, - const RemoteFileMetadata & file_meta_data_, + RemoteFileMetaDataBasePtr file_meta_data_, const std::filesystem::path & local_path_, - size_t cache_bytes_before_flush_, - std::shared_ptr read_buffer_) - : file_meta_data(file_meta_data_) + size_t cache_bytes_before_flush_) + : file_meta_data_ptr(file_meta_data_) , local_path(local_path_) , valid(true) , local_cache_bytes_read_before_flush(cache_bytes_before_flush_) , current_offset(0) - , remote_read_buffer(read_buffer_) { - /// readbuffer == nullptr if `RemoteCacheController` is created in `initOnce`, when metadata and local cache already exist. - if (remote_read_buffer) + if (file_meta_data_ptr) { - // setup local files - data_file_writer = std::make_unique((fs::path(local_path_) / "data.bin").string()); - data_file_writer->sync(); - - file_meta_data.save(local_path_ / "meta.txt"); - - download_task_holder = context->getSchedulePool().createTask("download remote file", [this]{ this->backgroundDownload(); }); - download_task_holder->activateAndSchedule(); + std::ofstream meta_data_file(local_path_ / "meta_data.txt", std::ios::out); + meta_data_file << file_meta_data_ptr->toString(); + meta_data_file.close(); } } RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_) { std::unique_lock lock{mutex}; - if (file_meta_data.status == RemoteFileMetadata::DOWNLOADED) + if (file_status == DOWNLOADED) { // finish reading if (start_offset_ >= current_offset) @@ -140,15 +121,29 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs return RemoteReadBufferCacheError::OK; } else - more_data_signal.wait(lock, [this, end_offset_] { return file_meta_data.status == RemoteFileMetadata::DOWNLOADED || current_offset >= end_offset_; }); + more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; }); } lock.unlock(); return RemoteReadBufferCacheError::OK; } -void RemoteCacheController::backgroundDownload() +bool RemoteCacheController::checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_) { - file_meta_data.status = RemoteFileMetadata::DOWNLOADING; + return !file_meta_data_ptr->equal(file_meta_data_); +} + +void RemoteCacheController::startBackgroundDownload(std::shared_ptr input_readbuffer, BackgroundSchedulePool & thread_pool) +{ + data_file_writer = std::make_unique((fs::path(local_path) / "data.bin").string()); + flush(true); + download_task_holder = thread_pool.createTask("download remote file", + [this,input_readbuffer]{ backgroundDownload(input_readbuffer); }); + download_task_holder->activateAndSchedule(); +} + +void RemoteCacheController::backgroundDownload(std::shared_ptr remote_read_buffer) +{ + file_status = DOWNLOADING; size_t before_unflush_bytes = 0; size_t total_bytes = 0; while (!remote_read_buffer->eof()) @@ -172,27 +167,33 @@ void RemoteCacheController::backgroundDownload() } std::unique_lock lock(mutex); current_offset += total_bytes; - file_meta_data.status = RemoteFileMetadata::DOWNLOADED; + file_status = DOWNLOADED; flush(true); data_file_writer.reset(); - remote_read_buffer.reset(); lock.unlock(); more_data_signal.notify_all(); - RemoteReadBufferCache::instance().updateTotalSize(file_meta_data.file_size); - LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data.toString()); + RemoteReadBufferCache::instance().updateTotalSize(file_meta_data_ptr->getFileSize()); + LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data_ptr->toString()); } -void RemoteCacheController::flush(bool need_flush_meta_data_) +void RemoteCacheController::flush(bool need_flush_status) { if (data_file_writer) { + LOG_DEBUG(&Poco::Logger::get("RemoteCacheController"),"flush file. offset:{}, file:{}. total_size:{}", current_offset, local_path.string(), file_meta_data_ptr->getFileSize()); data_file_writer->sync(); } - - if (!need_flush_meta_data_) - return; - - file_meta_data.save(local_path / "meta.txt"); + if (need_flush_status) + { + Poco::JSON::Object jobj; + jobj.set("file_status", static_cast(file_status)); + jobj.set("meta_data_class", meta_data_class); + std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + jobj.stringify(buf); + std::ofstream info_file(local_path / "info.txt"); + info_file << buf.str(); + info_file.close(); + } } RemoteCacheController::~RemoteCacheController() = default; @@ -200,7 +201,7 @@ RemoteCacheController::~RemoteCacheController() = default; void RemoteCacheController::close() { // delete directory - LOG_TRACE(log, "Removing the local cache. local path: {}, file meta data:{}", local_path.string(), file_meta_data.toString()); + LOG_TRACE(log, "Removing the local cache. local path: {}", local_path.string()); std::filesystem::remove_all(local_path); } @@ -233,7 +234,7 @@ void RemoteCacheController::deallocFile(std::unique_ptr throw Exception( ErrorCodes::BAD_ARGUMENTS, "Try to deallocate file with invalid handler remote path: {}, local path: {}", - file_meta_data.remote_path, + file_meta_data_ptr->getRemotePath(), local_path.string()); } opened_file_buffer_refs.erase(it); @@ -249,7 +250,7 @@ RemoteReadBuffer::~RemoteReadBuffer() file_cache_controller->deallocFile(std::move(file_buffer)); } -std::unique_ptr RemoteReadBuffer::create(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::unique_ptr read_buffer) +std::unique_ptr RemoteReadBuffer::create(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr read_buffer) { auto * log = &Poco::Logger::get("RemoteReadBuffer"); size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE; @@ -266,13 +267,13 @@ std::unique_ptr RemoteReadBuffer::create(ContextPtr context, c if (buff_size == 0) buff_size = DBMS_DEFAULT_BUFFER_SIZE; - const auto & remote_path = remote_file_meta.remote_path; + auto remote_path = remote_file_meta_data->getRemotePath(); auto remote_read_buffer = std::make_unique(buff_size); auto * raw_readbuffer_ptr = read_buffer.release(); std::shared_ptr shared_readbuffer_ptr(raw_readbuffer_ptr); RemoteReadBufferCacheError error; - std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta, shared_readbuffer_ptr); + std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta_data, shared_readbuffer_ptr); if (remote_read_buffer->file_cache_controller == nullptr) { LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error); @@ -293,7 +294,10 @@ bool RemoteReadBuffer::nextImpl() if (file_buffer) { auto start_offset = file_buffer->getPosition(); - auto end_offset = file_buffer->internalBuffer().size(); + auto end_offset = start_offset + file_buffer->internalBuffer().size(); + LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "nextImpl. start:{}, end:{}, file:{}, total_size:{}, remote_path:{}", + start_offset, end_offset, file_buffer->getFileName(), file_cache_controller->getFileMetaData()->getFileSize(), + file_cache_controller->getFileMetaData()->getRemotePath()); file_cache_controller->waitMoreData(start_offset, end_offset); auto status = file_buffer->next(); @@ -322,9 +326,11 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence) { /* * Need to wait here. For example, the current file has been download at position X, but here we try to seek to - * postition Y ( Y > X), it would fail. + * postition Y (Y > X), it would fail. */ file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size()); + LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "seek. offset:{}. file:{}, total_size:{}", offset, file_buffer->getFileName(), + file_cache_controller->getFileMetaData()->getFileSize()); auto ret = file_buffer->seek(offset, whence); BufferBase::set(file_buffer->buffer().begin(), file_buffer->buffer().size(), @@ -405,18 +411,18 @@ void RemoteReadBufferCache::initOnce( recover_task_holder->activateAndSchedule(); } -String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta) const +String RemoteReadBufferCache::calculateLocalPath(RemoteFileMetaDataBasePtr meta_data) const { - String full_path = meta.schema + ":" + meta.cluster + ":" + meta.remote_path; + String full_path = meta_data->getSchema() + ":" + meta_data->getCluster() + ":" + meta_data->getRemotePath(); UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size()); String hashcode_str = getHexUIntLowercase(hashcode); return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str; } std::pair -RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr & read_buffer) +RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr & read_buffer) { - LOG_TRACE(log, "createReader. {} {} {}", remote_file_meta.remote_path, remote_file_meta.last_modification_timestamp, remote_file_meta.file_size); + LOG_TRACE(log, "createReader. {}", remote_file_meta_data->toString()); // If something is wrong on startup, rollback to read from the original ReadBuffer if (!isInitialized()) { @@ -424,15 +430,15 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata return {nullptr, RemoteReadBufferCacheError::NOT_INIT}; } - auto remote_path = remote_file_meta.remote_path; - const auto & last_modification_timestamp = remote_file_meta.last_modification_timestamp; - auto local_path = calculateLocalPath(remote_file_meta); + auto remote_path = remote_file_meta_data->getRemotePath(); + const auto & last_modification_timestamp = remote_file_meta_data->getLastModificationTimestamp(); + auto local_path = calculateLocalPath(remote_file_meta_data); std::lock_guard lock(mutex); auto cache_iter = caches.find(local_path); if (cache_iter != caches.end()) { // if the file has been update on remote side, we need to redownload it - if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp) + if (cache_iter->second.cache_controller->checkFileChanged(remote_file_meta_data)) { LOG_TRACE( log, @@ -479,9 +485,9 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata fs::create_directories(local_path); - // pass a session context into RemoteCacheController is not a good idea auto cache_controller - = std::make_shared(context->getGlobalContext(), remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer); + = std::make_shared(remote_file_meta_data, local_path, local_cache_bytes_read_before_flush); + cache_controller->startBackgroundDownload(read_buffer, context->getSchedulePool()); CacheCell cache_cell; cache_cell.cache_controller = cache_controller; cache_cell.key_iterator = keys.insert(keys.end(), local_path); diff --git a/src/IO/RemoteReadBufferCache.h b/src/IO/RemoteReadBufferCache.h index 20cb2eb8877..4bd1d758a41 100644 --- a/src/IO/RemoteReadBufferCache.h +++ b/src/IO/RemoteReadBufferCache.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -31,51 +32,20 @@ enum class RemoteReadBufferCacheError : int8_t END_OF_FILE = 20, }; -struct RemoteFileMetadata +class RemoteCacheController { - enum LocalStatus +public: + enum LocalFileStatus { TO_DOWNLOAD = 0, DOWNLOADING = 1, DOWNLOADED = 2, }; - RemoteFileMetadata(): last_modification_timestamp(0l), file_size(0), status(TO_DOWNLOAD){} - RemoteFileMetadata( - const String & schema_, - const String & cluster_, - const String & path_, - UInt64 last_modification_timestamp_, - size_t file_size_) - : schema(schema_) - , cluster(cluster_) - , remote_path(path_) - , last_modification_timestamp(last_modification_timestamp_) - , file_size(file_size_) - , status(TO_DOWNLOAD) - { - } - bool load(const std::filesystem::path & local_path); - void save(const std::filesystem::path & local_path) const; - String toString() const; - - String schema; // Hive, S2 etc. - String cluster; - String remote_path; - UInt64 last_modification_timestamp; - size_t file_size; - LocalStatus status; -}; - -class RemoteCacheController -{ -public: RemoteCacheController( - ContextPtr context, - const RemoteFileMetadata & file_meta_data_, + RemoteFileMetaDataBasePtr file_meta_data_, const std::filesystem::path & local_path_, - size_t cache_bytes_before_flush_, - std::shared_ptr read_buffer_); + size_t cache_bytes_before_flush_); ~RemoteCacheController(); // recover from local disk @@ -97,7 +67,7 @@ public: { std::lock_guard lock{mutex}; //return opened_file_streams.empty() && remote_read_buffer == nullptr; - return opened_file_buffer_refs.empty() && remote_read_buffer == nullptr; + return opened_file_buffer_refs.empty() && file_status == DOWNLOADED; } void close(); @@ -111,9 +81,10 @@ public: inline size_t size() const { return current_offset; } inline const std::filesystem::path & getLocalPath() { return local_path; } - inline const String & getRemotePath() const { return file_meta_data.remote_path; } + inline String getRemotePath() const { return file_meta_data_ptr->getRemotePath(); } - inline UInt64 getLastModificationTimestamp() const { return file_meta_data.last_modification_timestamp; } + inline UInt64 getLastModificationTimestamp() const { return file_meta_data_ptr->getLastModificationTimestamp(); } + bool checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_); inline void markInvalid() { std::lock_guard lock(mutex); @@ -124,29 +95,33 @@ public: std::lock_guard lock(mutex); return valid; } - const RemoteFileMetadata & getFileMetaData() { return file_meta_data; } + RemoteFileMetaDataBasePtr getFileMetaData() { return file_meta_data_ptr; } + + void startBackgroundDownload(std::shared_ptr input_readbuffer, BackgroundSchedulePool & thread_pool); private: - // flush file and meta info into disk - void flush(bool need_flush_meta_data_ = false); + // flush file and status information + void flush(bool need_flush_status = false); + bool loadInnerInformation(const std::filesystem::path & file_path); BackgroundSchedulePool::TaskHolder download_task_holder; - void backgroundDownload(); + void backgroundDownload(std::shared_ptr remote_read_buffer); std::mutex mutex; std::condition_variable more_data_signal; std::set opened_file_buffer_refs; // refer to a buffer address - // meta info - RemoteFileMetadata file_meta_data; + String meta_data_class; + LocalFileStatus file_status = TO_DOWNLOAD; // for tracking download process + RemoteFileMetaDataBasePtr file_meta_data_ptr; std::filesystem::path local_path; bool valid; size_t local_cache_bytes_read_before_flush; size_t current_offset; - std::shared_ptr remote_read_buffer; + //std::shared_ptr remote_read_buffer; std::unique_ptr data_file_writer; Poco::Logger * log = &Poco::Logger::get("RemoteCacheController"); @@ -162,13 +137,13 @@ class RemoteReadBuffer : public BufferWithOwnMemory public: explicit RemoteReadBuffer(size_t buff_size); ~RemoteReadBuffer() override; - static std::unique_ptr create(ContextPtr contex, const RemoteFileMetadata & remote_file_meta, std::unique_ptr read_buffer); + static std::unique_ptr create(ContextPtr contex, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr read_buffer); bool nextImpl() override; - inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData().file_size > 0; } + inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData()->getFileSize() > 0; } off_t seek(off_t off, int whence) override; off_t getPosition() override; - std::optional getTotalSize() override { return file_cache_controller->getFileMetaData().file_size; } + std::optional getTotalSize() override { return file_cache_controller->getFileMetaData()->getFileSize(); } private: std::shared_ptr file_cache_controller; @@ -190,7 +165,7 @@ public: inline bool isInitialized() const { return initialized; } std::pair - createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr & read_buffer); + createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr & read_buffer); void updateTotalSize(size_t size) { total_size += size; } @@ -217,7 +192,7 @@ private: std::list keys; std::map caches; - String calculateLocalPath(const RemoteFileMetadata & meta) const; + String calculateLocalPath(RemoteFileMetaDataBasePtr meta) const; BackgroundSchedulePool::TaskHolder recover_task_holder; void recoverTask(); diff --git a/src/Storages/Hive/HiveFileMetaData.cpp b/src/Storages/Hive/HiveFileMetaData.cpp new file mode 100644 index 00000000000..5045e4f0364 --- /dev/null +++ b/src/Storages/Hive/HiveFileMetaData.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +HiveFileMetaData::~HiveFileMetaData() = default; + +String HiveFileMetaData::toString() +{ + Poco::JSON::Object jobj; + jobj.set("schema", schema); + jobj.set("cluster", cluster); + jobj.set("remote_path", remote_path); + jobj.set("last_modification_timestamp", last_modification_timestamp); + jobj.set("file_size", file_size); + std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + jobj.stringify(buf); + return buf.str(); + +} + +bool HiveFileMetaData::fromString(const String &buf) +{ + std::stringstream istream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + istream << buf; + Poco::JSON::Parser parser; + auto jobj = parser.parse(istream).extract(); + remote_path = jobj->get("remote_path").convert(); + schema = jobj->get("schema").convert(); + cluster = jobj->get("cluster").convert(); + last_modification_timestamp = jobj->get("last_modification_timestamp").convert(); + file_size =jobj->get("file_size").convert(); + return true; +} + +bool HiveFileMetaData::equal(RemoteFileMetaDataBasePtr meta_data) +{ + auto real_meta_data = std::dynamic_pointer_cast(meta_data); + if (!real_meta_data) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid meta data class"); + return last_modification_timestamp == real_meta_data->last_modification_timestamp; +} + +REGISTTER_REMOTE_FILE_META_DATA_CLASS(HiveFileMetaData) + +} diff --git a/src/Storages/Hive/HiveFileMetaData.h b/src/Storages/Hive/HiveFileMetaData.h new file mode 100644 index 00000000000..c56fa4d1727 --- /dev/null +++ b/src/Storages/Hive/HiveFileMetaData.h @@ -0,0 +1,29 @@ +#pragma once +#include +namespace DB +{ +class HiveFileMetaData : public RemoteFileMetaDataBase +{ +public: + HiveFileMetaData() = default; + HiveFileMetaData(const String & schema_, + const String & cluster_, + const String & remote_path_, + size_t file_size_, + UInt64 last_modification_timestamp_): + RemoteFileMetaDataBase(schema_, cluster_, remote_path_, file_size_, last_modification_timestamp_){} + ~HiveFileMetaData() override; + + String getClassName() override { return "HiveFileMetaData"; } + + RemoteFileMetaDataBasePtr clone() override + { + auto result = std::make_shared(schema, cluster, remote_path, file_size, last_modification_timestamp); + return result; + } + String toString() override; + bool fromString(const String &buf) override; + bool equal(RemoteFileMetaDataBasePtr meta_data) override; + +}; +} diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index d0a0991a16a..54d692e48a6 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -168,9 +169,10 @@ public: // Use local cache for remote filesystem if enabled. std::unique_ptr remote_read_buf; - if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs) + bool x = false; + if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs && x) remote_read_buf = RemoteReadBuffer::create(getContext(), - {"Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getLastModTs(), curr_file->getSize()}, + std::make_shared("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getLastModTs(), curr_file->getSize()), std::move(raw_read_buf)); else remote_read_buf = std::move(raw_read_buf);