mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Add entries to cache log when cache was not event attempted
This commit is contained in:
parent
000f2c9f7a
commit
0266cdf125
13
src/Common/getQueryId.h
Normal file
13
src/Common/getQueryId.h
Normal file
@ -0,0 +1,13 @@
|
||||
#include <Common/CurrentThread.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static String getQueryId()
|
||||
{
|
||||
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext())
|
||||
return "";
|
||||
return CurrentThread::getQueryId().toString();
|
||||
}
|
||||
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
#include "AsynchronousReadIndirectBufferFromRemoteFS.h"
|
||||
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
@ -57,7 +57,6 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
|
||||
}
|
||||
|
||||
|
||||
String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
|
||||
{
|
||||
return impl->getFileName();
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <base/scope_guard.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/hex.h>
|
||||
#include <Common/getQueryId.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -23,13 +24,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static String getQueryId()
|
||||
{
|
||||
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() || CurrentThread::getQueryId().size == 0)
|
||||
return "";
|
||||
return CurrentThread::getQueryId().toString();
|
||||
}
|
||||
|
||||
CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
|
||||
const String & remote_fs_object_path_,
|
||||
FileCachePtr cache_,
|
||||
@ -49,7 +43,7 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
|
||||
, read_until_position(read_until_position_)
|
||||
, remote_file_reader_creator(remote_file_reader_creator_)
|
||||
, query_id(getQueryId())
|
||||
, enable_logging(!query_id.empty() && CurrentThread::get().getQueryContext()->getSettingsRef().enable_filesystem_cache_log)
|
||||
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
|
||||
{
|
||||
}
|
||||
|
||||
@ -63,6 +57,7 @@ void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
|
||||
.source_file_path = remote_fs_object_path,
|
||||
.file_segment_range = { file_segment_range.left, file_segment_range.right },
|
||||
.file_segment_size = file_segment_range.size(),
|
||||
.cache_attempted = true,
|
||||
};
|
||||
|
||||
switch (type)
|
||||
|
@ -21,9 +21,12 @@
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <Common/hex.h>
|
||||
#include <Common/getQueryId.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -39,9 +42,6 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
|
||||
auto remote_path = fs::path(common_path_prefix) / path;
|
||||
|
||||
auto cache = settings.remote_fs_cache;
|
||||
bool with_cache = cache
|
||||
&& settings.enable_filesystem_cache
|
||||
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
|
||||
auto remote_file_reader_creator = [=, this]()
|
||||
{
|
||||
@ -95,7 +95,30 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
, blobs_to_read(blobs_to_read_)
|
||||
, settings(settings_)
|
||||
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
|
||||
, query_id(getQueryId())
|
||||
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
|
||||
{
|
||||
with_cache = settings.remote_fs_cache
|
||||
&& settings.enable_filesystem_cache
|
||||
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
|
||||
{
|
||||
FilesystemCacheLogElement elem
|
||||
{
|
||||
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
|
||||
.query_id = query_id,
|
||||
.source_file_path = getFileName(),
|
||||
.file_segment_range = { 0, total_bytes_read },
|
||||
.read_type = FilesystemCacheLogElement::ReadType::READ_FROM_FS_BYPASSING_CACHE,
|
||||
.file_segment_size = total_bytes_read,
|
||||
.cache_attempted = false,
|
||||
};
|
||||
|
||||
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
|
||||
cache_log->add(elem);
|
||||
}
|
||||
|
||||
|
||||
@ -199,6 +222,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
*/
|
||||
if (bytes_to_ignore)
|
||||
{
|
||||
total_bytes_read += bytes_to_ignore;
|
||||
current_buf->ignore(bytes_to_ignore);
|
||||
result = current_buf->hasPendingData();
|
||||
file_offset_of_buffer_end += bytes_to_ignore;
|
||||
@ -225,6 +249,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
{
|
||||
assert(available());
|
||||
nextimpl_working_buffer_offset = offset();
|
||||
total_bytes_read += available();
|
||||
}
|
||||
|
||||
return result;
|
||||
@ -282,5 +307,12 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
|
||||
return current_buf->getFileOffsetOfBufferEnd();
|
||||
}
|
||||
|
||||
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
|
||||
{
|
||||
if (!with_cache && enable_cache_log)
|
||||
{
|
||||
appendFilesystemCacheLog();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -29,6 +29,9 @@ public:
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const ReadSettings & settings_);
|
||||
|
||||
~ReadBufferFromRemoteFSGather() override;
|
||||
|
||||
String getFileName() const;
|
||||
|
||||
void reset();
|
||||
@ -66,6 +69,8 @@ protected:
|
||||
|
||||
String current_path;
|
||||
|
||||
bool with_cache;
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
||||
@ -75,6 +80,8 @@ private:
|
||||
|
||||
bool moveToNextBuffer();
|
||||
|
||||
void appendFilesystemCacheLog();
|
||||
|
||||
SeekableReadBufferPtr current_buf;
|
||||
|
||||
size_t current_buf_idx = 0;
|
||||
@ -89,6 +96,12 @@ private:
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
String query_id;
|
||||
|
||||
size_t total_bytes_read = 0;
|
||||
|
||||
bool enable_cache_log = false;
|
||||
};
|
||||
|
||||
|
||||
|
@ -80,6 +80,7 @@ struct ReadSettings
|
||||
bool enable_filesystem_cache = true;
|
||||
size_t filesystem_cache_max_wait_sec = 1;
|
||||
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
|
||||
bool enable_filesystem_cache_log = false;
|
||||
|
||||
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
|
@ -3407,6 +3407,7 @@ ReadSettings Context::getReadSettings() const
|
||||
res.enable_filesystem_cache = settings.enable_filesystem_cache;
|
||||
res.filesystem_cache_max_wait_sec = settings.filesystem_cache_max_wait_sec;
|
||||
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
|
||||
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
|
||||
|
||||
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
|
||||
|
||||
|
@ -37,6 +37,7 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
|
||||
{"file_segment_range", std::make_shared<DataTypeTuple>(std::move(types))},
|
||||
{"size", std::make_shared<DataTypeUInt64>()},
|
||||
{"read_type", std::make_shared<DataTypeString>()},
|
||||
{"cache_attempted", std::make_shared<DataTypeUInt8>()},
|
||||
};
|
||||
}
|
||||
|
||||
@ -53,6 +54,7 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
|
||||
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
|
||||
columns[i++]->insert(file_segment_size);
|
||||
columns[i++]->insert(typeToString(read_type));
|
||||
columns[i++]->insert(cache_attempted);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -39,6 +39,7 @@ struct FilesystemCacheLogElement
|
||||
std::pair<size_t, size_t> file_segment_range{};
|
||||
ReadType read_type{};
|
||||
size_t file_segment_size;
|
||||
bool cache_attempted;
|
||||
|
||||
static std::string name() { return "FilesystemCacheLog"; }
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user