diff --git a/src/IO/RemoteReadBufferCache.cpp b/src/IO/RemoteReadBufferCache.cpp index b93de46398c..057a896d8e6 100644 --- a/src/IO/RemoteReadBufferCache.cpp +++ b/src/IO/RemoteReadBufferCache.cpp @@ -1,16 +1,16 @@ +#include +#include +#include #include -#include -#include -#include -#include #include #include #include #include -#include "Common/Exception.h" -#include -#include -#include +#include +#include +#include +#include +#include namespace DB { @@ -21,10 +21,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -std::shared_ptr -RemoteCacheController::recover( - const std::filesystem::path & local_path_, - std::function const & finish_callback) +std::shared_ptr RemoteCacheController::recover( + const std::filesystem::path & local_path_, std::function const & finish_callback) { const auto & dir_handle = local_path_; std::filesystem::path data_file = local_path_ / "data.bin"; @@ -50,7 +48,7 @@ RemoteCacheController::recover( } auto file_size = std::filesystem::file_size(data_file); - RemoteFileMeta remote_file_meta (schema, cluster, remote_path, modification_ts, file_size); + RemoteFileMeta remote_file_meta(schema, cluster, remote_path, modification_ts, file_size); auto cntrl = std::make_shared(remote_file_meta, local_path_, 0, nullptr, finish_callback); cntrl->download_finished = true; cntrl->current_offset = file_size; @@ -89,7 +87,7 @@ RemoteCacheController::RemoteCacheController( jobj.set("remote_path", remote_path); jobj.set("downloaded", "false"); jobj.set("last_modification_timestamp", last_modification_timestamp); - std::stringstream buf;// STYLE_CHECK_ALLOW_STD_STRING_STREAM + std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM jobj.stringify(buf); std::ofstream meta_file(local_path_ / "meta.txt", std::ios::out); meta_file.write(buf.str().c_str(), buf.str().size()); @@ -127,8 +125,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs void RemoteCacheController::backgroupDownload(std::function const & finish_callback) { - auto task = [this, finish_callback]() - { + auto task = [this, finish_callback]() { size_t unflush_bytes = 0; size_t total_bytes = 0; while (!remote_readbuffer->eof()) @@ -163,7 +160,9 @@ void RemoteCacheController::backgroupDownload(std::functionscheduleOrThrow(task); } @@ -183,7 +182,7 @@ void RemoteCacheController::flush(bool need_flush_meta_) jobj.set("remote_path", remote_path); jobj.set("downloaded", download_finished ? "true" : "false"); jobj.set("last_modification_timestamp", last_modification_timestamp); - std::stringstream buf;// STYLE_CHECK_ALLOW_STD_STRING_STREAM + std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM jobj.stringify(buf); std::ofstream meta_file(local_path / "meta.txt", std::ios::out); @@ -203,7 +202,7 @@ std::tuple RemoteCacheController::allocFile() { std::filesystem::path result_local_path; if (download_finished) - result_local_path = local_path / "data.bin"; + result_local_path = local_path / "data.bin"; FILE * fs = fopen((local_path / "data.bin").string().c_str(), "r"); if (fs == nullptr) return {fs, result_local_path}; @@ -284,9 +283,7 @@ RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory RemoteReadBuffer::create( - const RemoteFileMeta &remote_file_meta_, - std::unique_ptr readbuffer) +std::unique_ptr RemoteReadBuffer::create(const RemoteFileMeta & remote_file_meta_, std::unique_ptr readbuffer) { size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE; if (readbuffer != nullptr) @@ -313,8 +310,7 @@ std::unique_ptr RemoteReadBuffer::create( if (retry > 0) usleep(20 * retry); - std::tie(rrb->file_reader, error) - = RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb); + std::tie(rrb->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb); retry++; } while (error == RemoteReadBufferCacheError::FILE_INVALID && retry < 10); if (rrb->file_reader == nullptr) @@ -403,10 +399,10 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance() } void RemoteReadBufferCache::recoverCachedFilesMeta( - const std::filesystem::path & current_path, - size_t current_depth, - size_t max_depth, - std::function const & finish_callback) + const std::filesystem::path & current_path, + size_t current_depth, + size_t max_depth, + std::function const & finish_callback) { if (current_depth >= max_depth) { @@ -416,24 +412,21 @@ void RemoteReadBufferCache::recoverCachedFilesMeta( auto cache_controller = RemoteCacheController::recover(path, finish_callback); if (!cache_controller) continue; - auto &cell = caches[path]; + auto & cell = caches[path]; cell.cache_controller = cache_controller; cell.key_iterator = keys.insert(keys.end(), path); } return; } - for (auto const &dir : std::filesystem::directory_iterator{current_path}) + for (auto const & dir : std::filesystem::directory_iterator{current_path}) { recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback); } - } -void RemoteReadBufferCache::initOnce(const std::filesystem::path & dir, - size_t limit_size_, - size_t bytes_read_before_flush_, - size_t max_threads) +void RemoteReadBufferCache::initOnce( + const std::filesystem::path & dir, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads) { LOG_TRACE(log, "init local cache. path: {}, limit {}", dir.string(), limit_size_); local_path_prefix = dir; @@ -448,8 +441,7 @@ void RemoteReadBufferCache::initOnce(const std::filesystem::path & dir, LOG_INFO(log, "{} not exists. this cache will be disable", local_path_prefix); return; } - auto recover_task = [this, root_dir]() - { + auto recover_task = [this, root_dir]() { auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); }; std::lock_guard lock(this->mutex); // two level dir. // @@ -465,12 +457,11 @@ std::filesystem::path RemoteReadBufferCache::calculateLocalPath(const RemoteFile std::string full_path = meta.schema + ":" + meta.cluster + ":" + meta.path; UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size()); std::string hashcode_str = getHexUIntLowercase(hashcode); - return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0,3) / hashcode_str; + return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0, 3) / hashcode_str; } -std::tuple, RemoteReadBufferCacheError> RemoteReadBufferCache::createReader( - const RemoteFileMeta &remote_file_meta, - std::shared_ptr & readbuffer) +std::tuple, RemoteReadBufferCacheError> +RemoteReadBufferCache::createReader(const RemoteFileMeta & remote_file_meta, std::shared_ptr & readbuffer) { // If something is wrong on startup, rollback to read from the original ReadBuffer if (!hasInitialized()) @@ -490,8 +481,10 @@ std::tuple, RemoteReadBufferCacheError> R // if the file has been update on remote side, we need to redownload it if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp) { - LOG_TRACE(log, - "remote file has been updated. " + remote_path + ":" + std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->" + LOG_TRACE( + log, + "remote file has been updated. " + remote_path + ":" + + std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->" + std::to_string(last_modification_timestamp)); cache_iter->second.cache_controller->markInvalid(); } @@ -499,7 +492,9 @@ std::tuple, RemoteReadBufferCacheError> R { // move the key to the list end keys.splice(keys.end(), keys, cache_iter->second.key_iterator); - return {std::make_shared(cache_iter->second.cache_controller.get(), file_size), RemoteReadBufferCacheError::OK}; + return { + std::make_shared(cache_iter->second.cache_controller.get(), file_size), + RemoteReadBufferCacheError::OK}; } } @@ -511,7 +506,9 @@ std::tuple, RemoteReadBufferCacheError> R { // move the key to the list end, this case should not happen? keys.splice(keys.end(), keys, cache_iter->second.key_iterator); - return {std::make_shared(cache_iter->second.cache_controller.get(), file_size), RemoteReadBufferCacheError::OK}; + return { + std::make_shared(cache_iter->second.cache_controller.get(), file_size), + RemoteReadBufferCacheError::OK}; } else { @@ -530,7 +527,8 @@ std::tuple, RemoteReadBufferCacheError> R std::filesystem::create_directories(local_path); auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); }; - auto cache_cntrl = std::make_shared(remote_file_meta, local_path, local_cache_bytes_read_before_flush, readbuffer, callback); + auto cache_cntrl + = std::make_shared(remote_file_meta, local_path, local_cache_bytes_read_before_flush, readbuffer, callback); CacheCell cc; cc.cache_controller = cache_cntrl; cc.key_iterator = keys.insert(keys.end(), local_path); @@ -548,7 +546,8 @@ bool RemoteReadBufferCache::clearLocalCache() if (!cntrl->isValid() && cntrl->closable()) { LOG_TRACE(log, "clear invalid cache: " + *it); - total_size = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0; + total_size + = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0; cntrl->close(); it = keys.erase(it); caches.erase(cache_it); @@ -568,12 +567,18 @@ bool RemoteReadBufferCache::clearLocalCache() } if (cache_it->second.cache_controller->closable()) { - total_size = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0; + total_size + = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0; cache_it->second.cache_controller->close(); caches.erase(cache_it); it = keys.erase(it); - LOG_TRACE(log, "clear local file {} for {}. key size:{}. next{}", cache_it->second.cache_controller->getLocalPath().string(), - cache_it->second.cache_controller->getRemotePath(), keys.size(), *it); + LOG_TRACE( + log, + "clear local file {} for {}. key size:{}. next{}", + cache_it->second.cache_controller->getLocalPath().string(), + cache_it->second.cache_controller->getRemotePath(), + keys.size(), + *it); } else break; diff --git a/src/IO/RemoteReadBufferCache.h b/src/IO/RemoteReadBufferCache.h index bc270aef401..b185fc57b47 100644 --- a/src/IO/RemoteReadBufferCache.h +++ b/src/IO/RemoteReadBufferCache.h @@ -14,7 +14,7 @@ namespace DB { -enum class RemoteReadBufferCacheError :int8_t +enum class RemoteReadBufferCacheError : int8_t { OK, NOT_INIT = 10, @@ -27,17 +27,14 @@ enum class RemoteReadBufferCacheError :int8_t struct RemoteFileMeta { RemoteFileMeta( - const std::string & schema_, - const std::string & cluster_, - const std::string & path_, - UInt64 last_modification_timestamp_, - size_t file_size_): - schema(schema_), - cluster(cluster_), - path(path_), - last_modification_timestamp(last_modification_timestamp_), - file_size(file_size_) - {} + const std::string & schema_, + const std::string & cluster_, + const std::string & path_, + UInt64 last_modification_timestamp_, + size_t file_size_) + : schema(schema_), cluster(cluster_), path(path_), last_modification_timestamp(last_modification_timestamp_), file_size(file_size_) + { + } std::string schema; // Hive, S2 etc. std::string cluster; @@ -53,7 +50,7 @@ class RemoteCacheController { public: RemoteCacheController( - const RemoteFileMeta &meta, + const RemoteFileMeta & meta, const std::filesystem::path & local_path_, size_t cache_bytes_before_flush_, std::shared_ptr readbuffer_, @@ -167,9 +164,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory public: explicit RemoteReadBuffer(size_t buff_size); ~RemoteReadBuffer() override; - static std::unique_ptr create( - const RemoteFileMeta &remote_file_meta_, - std::unique_ptr readbuffer); + static std::unique_ptr create(const RemoteFileMeta & remote_file_meta_, std::unique_ptr readbuffer); bool nextImpl() override; inline bool seekable() { return file_reader != nullptr && file_reader->size() > 0; } @@ -191,14 +186,13 @@ public: ~RemoteReadBufferCache(); // global instance static RemoteReadBufferCache & instance(); - std::shared_ptr GetThreadPool(){ return threadPool; } + std::shared_ptr GetThreadPool() { return threadPool; } void initOnce(const std::filesystem::path & dir, size_t limit_size, size_t bytes_read_before_flush_, size_t max_threads); inline bool hasInitialized() const { return inited; } - std::tuple, RemoteReadBufferCacheError> createReader( - const RemoteFileMeta & remote_file_meta, - std::shared_ptr & readbuffer); + std::tuple, RemoteReadBufferCacheError> + createReader(const RemoteFileMeta & remote_file_meta, std::shared_ptr & readbuffer); private: std::string local_path_prefix; @@ -219,13 +213,13 @@ private: std::list keys; std::map caches; - std::filesystem::path calculateLocalPath(const RemoteFileMeta &meta); + std::filesystem::path calculateLocalPath(const RemoteFileMeta & meta); void recoverCachedFilesMeta( - const std::filesystem::path & current_path, - size_t current_depth, - size_t max_depth, - std::function const & finish_callback); + const std::filesystem::path & current_path, + size_t current_depth, + size_t max_depth, + std::function const & finish_callback); bool clearLocalCache(); }; diff --git a/src/Storages/Hive/HiveCommon.cpp b/src/Storages/Hive/HiveCommon.cpp index 774ed78c44c..da9e2b31abc 100644 --- a/src/Storages/Hive/HiveCommon.cpp +++ b/src/Storages/Hive/HiveCommon.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -std::shared_ptr HiveMetastoreClient::getTableMeta(const std::string & db_name, const std::string & table_name) +std::shared_ptr HiveMetastoreClient::getTableMetadata(const std::string & db_name, const std::string & table_name) { LOG_TRACE(log, "get table meta:" + db_name + ":" + table_name); std::lock_guard lock{mutex}; @@ -32,7 +32,7 @@ std::shared_ptr HiveMetastoreClient::getTabl } std::string cache_key = db_name + "." + table_name; - std::shared_ptr result = table_meta_cache.get(cache_key); + std::shared_ptr result = table_meta_cache.get(cache_key); bool update_cache = false; std::map old_partition_infos; std::map partition_infos; @@ -49,15 +49,15 @@ std::shared_ptr HiveMetastoreClient::getTabl for (const auto & partition : partitions) { - auto & pinfo = partition_infos[partition.sd.location]; - pinfo.partition = partition; + auto & partition_info = partition_infos[partition.sd.location]; + partition_info.partition = partition; // query files under the partition by hdfs api is costly, we reuse the files in case the partition has no change if (result) { auto it = old_partition_infos.find(partition.sd.location); if (it != old_partition_infos.end() && it->second.equal(partition)) - pinfo.files = it->second.files; + partition_info.files = it->second.files; else update_cache = true; } @@ -72,7 +72,7 @@ std::shared_ptr HiveMetastoreClient::getTabl if (update_cache) { LOG_INFO(log, "reload hive partition meta info:" + db_name + ":" + table_name); - result = std::make_shared(db_name, table_name, table, std::move(partition_infos), getContext()); + result = std::make_shared(db_name, table_name, table, std::move(partition_infos), getContext()); table_meta_cache.set(cache_key, result); } return result; @@ -82,7 +82,7 @@ void HiveMetastoreClient::clearTableMeta(const std::string & db_name, const std: { std::lock_guard lock{mutex}; std::string cache_key = db_name + "." + table_name; - std::shared_ptr meta = table_meta_cache.get(cache_key); + std::shared_ptr meta = table_meta_cache.get(cache_key); if (meta) table_meta_cache.set(cache_key, nullptr); } @@ -107,7 +107,7 @@ bool HiveMetastoreClient::PartitionInfo::equal(const Apache::Hadoop::Hive::Parti return (it1 == partition.parameters.end() && it2 == other.parameters.end()); } -std::vector HiveMetastoreClient::HiveTableMeta::getPartitions() +std::vector HiveMetastoreClient::HiveTableMetadata::getPartitions() { std::vector result; @@ -117,7 +117,7 @@ std::vector HiveMetastoreClient::HiveTableMeta: return result; } -std::vector HiveMetastoreClient::HiveTableMeta::getLocationFiles(const std::string & location) +std::vector HiveMetastoreClient::HiveTableMetadata::getLocationFiles(const std::string & location) { std::map::const_iterator it; if (!empty_partition_keys) @@ -153,7 +153,7 @@ std::vector HiveMetastoreClient::HiveTableMeta::g return result; } -std::vector HiveMetastoreClient::HiveTableMeta::getLocationFiles(const HDFSFSPtr & fs, const std::string & location) +std::vector HiveMetastoreClient::HiveTableMetadata::getLocationFiles(const HDFSFSPtr & fs, const std::string & location) { std::map::const_iterator it; if (!empty_partition_keys) diff --git a/src/Storages/Hive/HiveCommon.h b/src/Storages/Hive/HiveCommon.h index e70a4c8871d..af056739b6b 100644 --- a/src/Storages/Hive/HiveCommon.h +++ b/src/Storages/Hive/HiveCommon.h @@ -37,10 +37,10 @@ public: }; // use for speeding up query metadata - struct HiveTableMeta : public WithContext + struct HiveTableMetadata : public WithContext { public: - HiveTableMeta( + HiveTableMetadata( const std::string & db_name_, const std::string & table_name_, std::shared_ptr table_, @@ -79,7 +79,7 @@ public: { } - std::shared_ptr getTableMeta(const std::string & db_name, const std::string & table_name); + std::shared_ptr getTableMetadata(const std::string & db_name, const std::string & table_name); void clearTableMeta(const std::string & db_name, const std::string & table_name); void setClient(std::shared_ptr client_); inline bool isExpired() const { return expired; } @@ -88,7 +88,7 @@ public: private: std::shared_ptr client; - LRUCache table_meta_cache; + LRUCache table_meta_cache; mutable std::mutex mutex; std::atomic expired{false};