This commit is contained in:
KinderRiven 2022-04-30 22:38:34 +08:00
parent 01210c8d38
commit e1acacf831
5 changed files with 30 additions and 20 deletions

View File

@ -11,7 +11,6 @@
#include <IO/Operators.h>
#include <pcg-random/pcg_random.hpp>
#include <filesystem>
#include <Interpreters/Context.h>
namespace fs = std::filesystem;

View File

@ -10,7 +10,6 @@
#include <boost/noncopyable.hpp>
#include <map>
#include <Interpreters/FilesystemCacheLog.h>
#include "FileCache_fwd.h"
#include <Common/logger_useful.h>
#include <Common/FileSegment.h>

View File

@ -49,9 +49,8 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
, read_until_position(read_until_position_)
, remote_file_reader_creator(remote_file_reader_creator_)
, query_id(getQueryId())
, enable_logging(!query_id.empty() && CurrentThread::get().getQueryContext()->getSettingsRef().enable_filesystem_cache_log)
{
if (!query_id.empty() && (CurrentThread::get().getQueryContext()->getSettingsRef().enable_filesystem_cache_log))
enable_logging = true;
}
void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
@ -120,7 +119,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
{
switch (read_type_)
{
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
/**
* Each downloader is elected to download at most buffer_size bytes and then any other can
* continue. The one who continues download should reuse download buffer.
@ -147,7 +147,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
return remote_fs_segment_reader;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
/// Result buffer is owned only by current buffer -- not shareable like in the case above.
if (remote_file_reader && remote_file_reader->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end)
@ -188,11 +189,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
{
switch (download_state)
{
case FileSegment::State::SKIP_CACHE: {
case FileSegment::State::SKIP_CACHE:
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(file_segment, read_type);
}
case FileSegment::State::EMPTY: {
case FileSegment::State::EMPTY:
{
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
@ -223,7 +226,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
continue;
}
}
case FileSegment::State::DOWNLOADING: {
case FileSegment::State::DOWNLOADING:
{
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
@ -253,11 +257,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
continue;
}
case FileSegment::State::DOWNLOADED: {
case FileSegment::State::DOWNLOADED:
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
}
case FileSegment::State::PARTIALLY_DOWNLOADED: {
case FileSegment::State::PARTIALLY_DOWNLOADED:
{
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
@ -306,7 +312,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
download_state = file_segment->state();
continue;
}
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: {
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
@ -352,7 +359,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
switch (read_type)
{
case ReadType::CACHED: {
case ReadType::CACHED:
{
#ifndef NDEBUG
auto * file_reader = assert_cast<ReadBufferFromFile *>(read_buffer_for_file_segment.get());
size_t file_size = file_reader->size();
@ -381,11 +389,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
break;
}
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
assert(file_segment->isDownloader());
if (bytes_to_predownload)
@ -787,15 +797,18 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
switch (read_type)
{
case ReadType::CACHED: {
case ReadType::CACHED:
{
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, size);
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
break;
}
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
break;

View File

@ -65,7 +65,7 @@ private:
size_t getTotalSizeToRead();
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range &file_segment_range, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
Poco::Logger * log;
IFileCache::Key cache_key;

View File

@ -26,7 +26,6 @@
#include <memory>
#include <mutex>
#include <optional>
#include <atomic>
#include <thread>
#include <exception>