fix code stylke

This commit is contained in:
taiyang-li 2021-11-29 14:50:33 +08:00
parent fd4462db64
commit 83be8d28e6
2 changed files with 26 additions and 24 deletions

View File

@ -118,7 +118,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
return RemoteReadBufferCacheError::OK; return RemoteReadBufferCacheError::OK;
} }
else else
more_data_signal.wait(lock, [this, end_offset_] { return this->download_finished || this->current_offset >= end_offset_; }); more_data_signal.wait(lock, [this, end_offset_] { return download_finished || current_offset >= end_offset_; });
} }
lock.unlock(); lock.unlock();
return RemoteReadBufferCacheError::OK; return RemoteReadBufferCacheError::OK;
@ -353,12 +353,12 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence)
if (whence == SEEK_SET) if (whence == SEEK_SET)
new_pos = offset; new_pos = offset;
else if (whence == SEEK_CUR) else if (whence == SEEK_CUR)
new_pos = pos_in_file - (working_buffer.end() - pos) + offset; new_pos = pos_in_file - available() + offset;
else else
throw Exception("expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::BAD_ARGUMENTS); throw Exception("expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::BAD_ARGUMENTS);
/// Position is unchanged. /// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == pos_in_file) if (off_t(new_pos + available()) == pos_in_file)
return new_pos; return new_pos;
if (new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size())) if (new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
@ -440,16 +440,16 @@ void RemoteReadBufferCache::initOnce(
std::filesystem::path root_dir(local_path_prefix); std::filesystem::path root_dir(local_path_prefix);
if (!std::filesystem::exists(root_dir)) if (!std::filesystem::exists(root_dir))
{ {
LOG_INFO(log, "{} not exists. this cache will be disable", local_path_prefix); LOG_INFO(log, "Path {} not exists. this cache will be disable", local_path_prefix);
return; return;
} }
auto recover_task = [this, root_dir]() { auto recover_task = [this, root_dir]() {
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); }; auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
std::lock_guard lock(this->mutex); std::lock_guard lock(mutex);
// two level dir. /<first 3 chars of path hash code>/<path hash code> // two level dir. /<first 3 chars of path hash code>/<path hash code>
recoverCachedFilesMeta(root_dir, 1, 2, callback); recoverCachedFilesMeta(root_dir, 1, 2, callback);
this->initialized = true; initialized = true;
LOG_TRACE(this->log, "recovered from disk "); LOG_TRACE(log, "Recovered from disk ");
}; };
getThreadPool()->scheduleOrThrow(recover_task); getThreadPool()->scheduleOrThrow(recover_task);
} }
@ -485,9 +485,10 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
{ {
LOG_TRACE( LOG_TRACE(
log, log,
"remote file has been updated. " + remote_path + ":" "Remote file ({}) has been updated. Last saved modification time: {}, actual last modification time: {}",
+ std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->" remote_path,
+ std::to_string(last_modification_timestamp)); std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()),
std::to_string(last_modification_timestamp));
cache_iter->second.cache_controller->markInvalid(); cache_iter->second.cache_controller->markInvalid();
} }
else else
@ -522,35 +523,36 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
// reach the disk capacity limit // reach the disk capacity limit
if (!clear_ret) if (!clear_ret)
{ {
LOG_ERROR(log, "local cache is full, return nullptr"); LOG_INFO(log, "Reached local cache capacity limit size ({})", limit_size);
return {nullptr, RemoteReadBufferCacheError::DISK_FULL}; return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
} }
std::filesystem::create_directories(local_path); std::filesystem::create_directories(local_path);
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); }; auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
auto cache_cntrl auto cache_controller
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer, callback); = std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer, callback);
CacheCell cc; CacheCell cache_cell;
cc.cache_controller = cache_cntrl; cache_cell.cache_controller = cache_controller;
cc.key_iterator = keys.insert(keys.end(), local_path); cache_cell.key_iterator = keys.insert(keys.end(), local_path);
caches[local_path] = cc; caches[local_path] = cache_cell;
return {std::make_shared<LocalCachedFileReader>(cache_cntrl.get(), file_size), RemoteReadBufferCacheError::OK}; return {std::make_shared<LocalCachedFileReader>(cache_controller.get(), file_size), RemoteReadBufferCacheError::OK};
} }
bool RemoteReadBufferCache::clearLocalCache() bool RemoteReadBufferCache::clearLocalCache()
{ {
for (auto it = keys.begin(); it != keys.end();) for (auto it = keys.begin(); it != keys.end();)
{ {
// TODO keys is not thread-safe
auto cache_it = caches.find(*it); auto cache_it = caches.find(*it);
auto cntrl = cache_it->second.cache_controller; auto cache_controller = cache_it->second.cache_controller;
if (!cntrl->isValid() && cntrl->closable()) if (!cache_controller->isValid() && cache_controller->closable())
{ {
LOG_TRACE(log, "clear invalid cache: " + *it); LOG_TRACE(log, "Clear invalid cache entry with key {}", *it);
total_size total_size
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0; = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
cntrl->close(); cache_controller->close();
it = keys.erase(it); it = keys.erase(it);
caches.erase(cache_it); caches.erase(cache_it);
} }

View File

@ -56,7 +56,7 @@ public:
size_t cache_bytes_before_flush_, size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_, std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback); std::function<void(RemoteCacheController *)> const & finish_callback);
~RemoteCacheController(); // the local files will be deleted in descontructor ~RemoteCacheController();
// recover from local disk // recover from local disk
static std::shared_ptr<RemoteCacheController> static std::shared_ptr<RemoteCacheController>