mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
impl system.filesystem_cache_log
This commit is contained in:
parent
d595df1604
commit
d80aa0fd17
@ -41,70 +41,6 @@ IFileCache::IFileCache(
|
||||
{
|
||||
}
|
||||
|
||||
void IFileCache::addFilesystemCacheLogRef(const String & query_id)
|
||||
{
|
||||
/// must be a client query
|
||||
assert(!query_id.empty());
|
||||
|
||||
std::lock_guard cache_lock(logs_mutex);
|
||||
auto iter = cache_logs.find(query_id);
|
||||
if (iter == cache_logs.end())
|
||||
{
|
||||
iter = cache_logs.insert({query_id, std::make_shared<FilesystemCacheLogsRecorder>()}).first;
|
||||
iter->second->logs = std::make_shared<FilesystemCacheLogs>();
|
||||
}
|
||||
iter->second->ref++;
|
||||
}
|
||||
|
||||
void IFileCache::decFilesystemCacheLogRef(const String & query_id)
|
||||
{
|
||||
/// must be a client query
|
||||
assert(!query_id.empty());
|
||||
|
||||
std::lock_guard cache_lock(logs_mutex);
|
||||
auto iter = cache_logs.find(query_id);
|
||||
if (iter != cache_logs.end())
|
||||
{
|
||||
const auto & query_record = iter->second;
|
||||
query_record->ref--;
|
||||
|
||||
if (!query_record->ref)
|
||||
{
|
||||
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
|
||||
{
|
||||
for (const auto & elem : *(query_record->logs))
|
||||
cache_log->add(*elem.second);
|
||||
}
|
||||
cache_logs.erase(iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void IFileCache::updateFilesystemCacheLog(const String & query_id, const String & remote_fs_path, size_t hit_count, size_t miss_count)
|
||||
{
|
||||
/// must be a client query
|
||||
assert(!query_id.empty());
|
||||
|
||||
std::lock_guard cache_lock(logs_mutex);
|
||||
auto iter = cache_logs.find(query_id);
|
||||
if (iter != cache_logs.end())
|
||||
{
|
||||
auto logs = iter->second->logs;
|
||||
auto elem = logs->find(remote_fs_path);
|
||||
if (elem == logs->end())
|
||||
{
|
||||
elem = logs->insert({remote_fs_path, std::make_shared<FilesystemCacheLogElement>()}).first;
|
||||
|
||||
const auto current_time = std::chrono::system_clock::now();
|
||||
elem->second->event_time = std::chrono::system_clock::to_time_t(current_time);
|
||||
elem->second->query_id = query_id;
|
||||
elem->second->remote_file_path = remote_fs_path;
|
||||
}
|
||||
elem->second->hit_count += hit_count;
|
||||
elem->second->miss_count += miss_count;
|
||||
}
|
||||
}
|
||||
|
||||
IFileCache::Key IFileCache::hash(const String & path)
|
||||
{
|
||||
return sipHash128(path.data(), path.size());
|
||||
|
@ -31,7 +31,6 @@ friend struct FileSegmentsHolder;
|
||||
public:
|
||||
using Key = UInt128;
|
||||
using Downloader = std::unique_ptr<SeekableReadBuffer>;
|
||||
using QueryCacheLogs = std::unordered_map<String, std::shared_ptr<FilesystemCacheLogsRecorder>>;
|
||||
|
||||
IFileCache(
|
||||
const String & cache_base_path_,
|
||||
@ -92,13 +91,6 @@ public:
|
||||
/// For debug.
|
||||
virtual String dumpStructure(const Key & key) = 0;
|
||||
|
||||
void addFilesystemCacheLogRef(const String & query_id);
|
||||
|
||||
void decFilesystemCacheLogRef(const String & query_id);
|
||||
|
||||
void recordFilesystemCacheLog(
|
||||
const String & query_id, const String & remote_fs_path, std::pair<size_t, size_t> & range, FilesystemCacheLogElement::ReadType);
|
||||
|
||||
protected:
|
||||
String cache_base_path;
|
||||
size_t max_size;
|
||||
@ -111,8 +103,6 @@ protected:
|
||||
|
||||
mutable std::mutex logs_mutex;
|
||||
|
||||
QueryCacheLogs cache_logs;
|
||||
|
||||
virtual bool tryReserve(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
@ -557,7 +557,7 @@ class IColumn;
|
||||
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \
|
||||
M(UInt64, filesystem_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
|
||||
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
|
||||
M(Bool, enable_cache_log, true, "Allows to record the caching access for query", 0) \
|
||||
M(Bool, enable_filesystem_cache_log, true, "Allows to record the filesystem caching log for each query", 0) \
|
||||
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
|
||||
\
|
||||
M(Bool, use_structure_from_insertion_table_in_table_functions, false, "Use structure from insertion table instead of schema inference from data", 0) \
|
||||
|
@ -50,20 +50,37 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
|
||||
, remote_file_reader_creator(remote_file_reader_creator_)
|
||||
, query_id(getQueryId())
|
||||
{
|
||||
if (!query_id.empty() && (CurrentThread::get().getQueryContext()->getSettingsRef().enable_cache_log))
|
||||
{
|
||||
if (!query_id.empty() && (CurrentThread::get().getQueryContext()->getSettingsRef().enable_filesystem_cache_log))
|
||||
enable_logging = true;
|
||||
cache->addFilesystemCacheLogRef(query_id);
|
||||
}
|
||||
}
|
||||
|
||||
CachedReadBufferFromRemoteFS::~CachedReadBufferFromRemoteFS()
|
||||
void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
|
||||
const FileSegment::Range & file_segment_range, CachedReadBufferFromRemoteFS::ReadType type)
|
||||
{
|
||||
if (enable_logging)
|
||||
FilesystemCacheLogElement elem
|
||||
{
|
||||
cache->updateFilesystemCacheLog(query_id, remote_fs_object_path, cache_hit_count, cache_miss_count);
|
||||
cache->decFilesystemCacheLogRef(query_id);
|
||||
.query_id = query_id,
|
||||
.remote_file_path = remote_fs_object_path,
|
||||
};
|
||||
|
||||
const auto current_time = std::chrono::system_clock::now();
|
||||
elem.event_time = std::chrono::system_clock::to_time_t(current_time);
|
||||
elem.file_segment_range = std::make_pair(file_segment_range.left, file_segment_range.right);
|
||||
|
||||
switch(type)
|
||||
{
|
||||
case CachedReadBufferFromRemoteFS::ReadType::CACHED:
|
||||
elem.read_type = FilesystemCacheLogElement::ReadType::READ_FROM_CACHE;
|
||||
break;
|
||||
case CachedReadBufferFromRemoteFS::ReadType::REMOTE_FS_READ_BYPASS_CACHE:
|
||||
elem.read_type = FilesystemCacheLogElement::ReadType::READ_FROM_FS_AND_DOWNLOADED_TO_CACHE;
|
||||
break;
|
||||
case CachedReadBufferFromRemoteFS::ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
|
||||
elem.read_type = FilesystemCacheLogElement::ReadType::READ_FROM_FS_BYPASSING_CACHE;
|
||||
break;
|
||||
}
|
||||
|
||||
Context::getGlobalContextInstance()->getFilesystemCacheLog()->add(elem);
|
||||
}
|
||||
|
||||
void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
|
||||
@ -103,8 +120,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
|
||||
{
|
||||
switch (read_type_)
|
||||
{
|
||||
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
|
||||
{
|
||||
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
|
||||
/**
|
||||
* Each downloader is elected to download at most buffer_size bytes and then any other can
|
||||
* continue. The one who continues download should reuse download buffer.
|
||||
@ -131,8 +147,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
|
||||
|
||||
return remote_fs_segment_reader;
|
||||
}
|
||||
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
|
||||
{
|
||||
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
|
||||
/// Result buffer is owned only by current buffer -- not shareable like in the case above.
|
||||
|
||||
if (remote_file_reader && remote_file_reader->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end)
|
||||
@ -434,12 +449,9 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
|
||||
implementation_buffer = getImplementationBuffer(*current_file_segment_it);
|
||||
|
||||
if (read_type == ReadType::CACHED)
|
||||
{
|
||||
cache_hit_count++;
|
||||
(*current_file_segment_it)->incrementHitsCount();
|
||||
}
|
||||
else
|
||||
cache_miss_count++;
|
||||
if (enable_logging)
|
||||
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
|
||||
|
||||
LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString());
|
||||
return true;
|
||||
@ -684,12 +696,10 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
|
||||
implementation_buffer = getImplementationBuffer(*current_file_segment_it);
|
||||
|
||||
if (read_type == ReadType::CACHED)
|
||||
{
|
||||
cache_hit_count++;
|
||||
(*current_file_segment_it)->incrementHitsCount();
|
||||
}
|
||||
else
|
||||
cache_miss_count++;
|
||||
|
||||
if (enable_logging)
|
||||
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
|
||||
}
|
||||
|
||||
assert(!internal_buffer.empty());
|
||||
|
@ -4,7 +4,8 @@
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -21,8 +22,6 @@ public:
|
||||
const ReadSettings & settings_,
|
||||
size_t read_until_position_);
|
||||
|
||||
~CachedReadBufferFromRemoteFS() override;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
@ -66,6 +65,8 @@ private:
|
||||
size_t getTotalSizeToRead();
|
||||
bool completeFileSegmentAndGetNext();
|
||||
|
||||
void appendFilesystemCacheLog(const FileSegment::Range &file_segment_range, ReadType read_type);
|
||||
|
||||
Poco::Logger * log;
|
||||
IFileCache::Key cache_key;
|
||||
String remote_fs_object_path;
|
||||
@ -76,9 +77,6 @@ private:
|
||||
size_t file_offset_of_buffer_end = 0;
|
||||
size_t bytes_to_predownload = 0;
|
||||
|
||||
size_t cache_hit_count = 0;
|
||||
size_t cache_miss_count = 0;
|
||||
|
||||
RemoteFSFileReaderCreator remote_file_reader_creator;
|
||||
|
||||
/// Remote read buffer, which can only be owned by current buffer.
|
||||
|
@ -49,7 +49,7 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
|
||||
columns[i++]->insert(query_id);
|
||||
|
||||
columns[i++]->insert(remote_file_path);
|
||||
columns[i++]->insert({file_segment_range.first, file_segment_range.second});
|
||||
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
|
||||
columns[i++]->insert(typeToString(read_type));
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeNumber.h>
|
||||
#include <DataTypes/DataTypeNumberBase.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Interpreters/SystemLog.h>
|
||||
#include <Interpreters/TransactionVersionMetadata.h>
|
||||
@ -48,16 +48,6 @@ struct FilesystemCacheLogElement
|
||||
static const char * getCustomColumnList() { return nullptr; }
|
||||
};
|
||||
|
||||
using FilesystemCacheLogElementPtr = std::shared_ptr<FilesystemCacheLogElement>;
|
||||
using FilesystemCacheLogs = std::unordered_map<String, FilesystemCacheLogElementPtr>;
|
||||
using FilesystemCacheLogsPtr = std::shared_ptr<FilesystemCacheLogs>;
|
||||
|
||||
struct FilesystemCacheLogsRecorder
|
||||
{
|
||||
size_t ref = 0;
|
||||
FilesystemCacheLogsPtr logs;
|
||||
};
|
||||
|
||||
class FilesystemCacheLog : public SystemLog<FilesystemCacheLogElement>
|
||||
{
|
||||
using SystemLog<FilesystemCacheLogElement>::SystemLog;
|
||||
|
Loading…
Reference in New Issue
Block a user