impl system.log_table

This commit is contained in:
KinderRiven 2022-04-29 17:45:45 +08:00
parent a4b8f9e9cf
commit ce9a6965d0
6 changed files with 139 additions and 97 deletions

View File

@ -41,7 +41,7 @@ IFileCache::IFileCache(
{
}
void IFileCache::addQueryRef(String & query_id)
void IFileCache::addQueryRef(const String & query_id)
{
/// must be a query log
if (query_id.size())
@ -57,7 +57,7 @@ void IFileCache::addQueryRef(String & query_id)
}
}
void IFileCache::DecQueryRef(String & query_id)
void IFileCache::DecQueryRef(const String & query_id)
{
/// must be a query log
if (query_id.size())
@ -80,7 +80,7 @@ void IFileCache::DecQueryRef(String & query_id)
}
}
void IFileCache::updateQueryCacheLog(String & query_id, String & remote_fs_path, size_t hit_count, size_t miss_count)
void IFileCache::updateQueryCacheLog(const String & query_id, const String & remote_fs_path, size_t hit_count, size_t miss_count)
{
/// must be a query log
if (query_id.size())

View File

@ -92,11 +92,11 @@ public:
/// For debug.
virtual String dumpStructure(const Key & key) = 0;
void addQueryRef(String & query_id);
void addQueryRef(const String & query_id);
void DecQueryRef(String & query_id);
void DecQueryRef(const String & query_id);
void updateQueryCacheLog(String &query_id, String &remote_fs_path, size_t hit_count, size_t miss_count);
void updateQueryCacheLog(const String &query_id, const String &remote_fs_path, size_t hit_count, size_t miss_count);
protected:
String cache_base_path;

View File

@ -557,6 +557,7 @@ class IColumn;
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(UInt64, filesystem_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_cache_log, true, "Allows to record the caching access for query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
\
M(Bool, use_structure_from_insertion_table_in_table_functions, false, "Use structure from insertion table instead of schema inference from data", 0) \

View File

@ -1,17 +1,17 @@
#include "CachedReadBufferFromRemoteFS.h"
#include <Common/assert_cast.h>
#include <Common/hex.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h>
#include <Common/assert_cast.h>
#include <Common/hex.h>
namespace ProfileEvents
{
extern const Event RemoteFSReadBytes;
extern const Event RemoteFSCacheReadBytes;
extern const Event RemoteFSCacheDownloadBytes;
extern const Event RemoteFSReadBytes;
extern const Event RemoteFSCacheReadBytes;
extern const Event RemoteFSCacheDownloadBytes;
}
namespace DB
@ -42,14 +42,17 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
, read_until_position(read_until_position_)
, remote_file_reader_creator(remote_file_reader_creator_)
{
LOG_TRACE(log, "current_query_id is {}", settings.current_query_id);
cache->addQueryRef(settings.current_query_id);
if ((CurrentThread::get().getQueryContext()->getSettingsRef().enable_cache_log) && (CurrentThread::getQueryId().size))
cache->addQueryRef(CurrentThread::getQueryId().toString());
}
CachedReadBufferFromRemoteFS::~CachedReadBufferFromRemoteFS()
{
cache->updateQueryCacheLog(settings.current_query_id, remote_fs_object_path, cache_hit_count, cache_miss_count);
cache->DecQueryRef(settings.current_query_id);
if ((CurrentThread::get().getQueryContext()->getSettingsRef().enable_cache_log) && (CurrentThread::getQueryId().size))
{
cache->updateQueryCacheLog(CurrentThread::getQueryId().toString(), remote_fs_object_path, cache_hit_count, cache_miss_count);
cache->DecQueryRef(CurrentThread::getQueryId().toString());
}
}
void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
@ -89,8 +92,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
{
switch (read_type_)
{
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
/**
* Each downloader is elected to download at most buffer_size bytes and then any other can
* continue. The one who continues download should reuse download buffer.
@ -117,8 +119,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
return remote_fs_segment_reader;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
/// Result buffer is owned only by current buffer -- not shareable like in the case above.
if (remote_file_reader && remote_file_reader->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end)
@ -128,8 +129,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
return remote_file_reader;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot use remote filesystem reader with read type: {}", toString(read_type));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot use remote filesystem reader with read type: {}", toString(read_type));
}
}
@ -160,13 +160,11 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
{
switch (download_state)
{
case FileSegment::State::SKIP_CACHE:
{
case FileSegment::State::SKIP_CACHE: {
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(file_segment, read_type);
}
case FileSegment::State::EMPTY:
{
case FileSegment::State::EMPTY: {
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
@ -197,8 +195,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
continue;
}
}
case FileSegment::State::DOWNLOADING:
{
case FileSegment::State::DOWNLOADING: {
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
@ -228,13 +225,11 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
continue;
}
case FileSegment::State::DOWNLOADED:
{
case FileSegment::State::DOWNLOADED: {
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
}
case FileSegment::State::PARTIALLY_DOWNLOADED:
{
case FileSegment::State::PARTIALLY_DOWNLOADED: {
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
@ -283,8 +278,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
download_state = file_segment->state();
continue;
}
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: {
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
@ -319,15 +313,18 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
assert(file_segment->range() == range);
assert(file_offset_of_buffer_end >= range.left && file_offset_of_buffer_end <= range.right);
LOG_TEST(log, "Current file segment: {}, read type: {}, current file offset: {}",
range.toString(), toString(read_type), file_offset_of_buffer_end);
LOG_TEST(
log,
"Current file segment: {}, read type: {}, current file offset: {}",
range.toString(),
toString(read_type),
file_offset_of_buffer_end);
read_buffer_for_file_segment->setReadUntilPosition(range.right + 1); /// [..., range.right]
switch (read_type)
{
case ReadType::CACHED:
{
case ReadType::CACHED: {
#ifndef NDEBUG
auto * file_reader = assert_cast<ReadBufferFromFile *>(read_buffer_for_file_segment.get());
size_t file_size = file_reader->size();
@ -337,25 +334,30 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
ErrorCodes::LOGICAL_ERROR,
"Unexpected state of cache file. Cache file size: {}, cache file offset: {}, "
"expected file size to be non-zero and file downloaded size to exceed current file read offset (expected: {} > {})",
file_size, range.left, range.left + file_size, file_offset_of_buffer_end);
file_size,
range.left,
range.left + file_size,
file_offset_of_buffer_end);
#endif
size_t seek_offset = file_offset_of_buffer_end - range.left;
if (file_offset_of_buffer_end < range.left)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invariant failed. Expected {} > {} (current offset > file segment's start offset)", file_offset_of_buffer_end, range.left);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invariant failed. Expected {} > {} (current offset > file segment's start offset)",
file_offset_of_buffer_end,
range.left);
read_buffer_for_file_segment->seek(seek_offset, SEEK_SET);
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
break;
}
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
assert(file_segment->isDownloader());
if (bytes_to_predownload)
@ -374,10 +376,15 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
auto impl_range = read_buffer_for_file_segment->getRemainingReadRange();
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, position: {}, implementation buffer offset: {}, "
"Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, position: {}, implementation buffer offset: "
"{}, "
"implementation buffer reading until: {}, file segment info: {}",
file_offset_of_buffer_end, download_offset, read_buffer_for_file_segment->getPosition(),
impl_range.left, *impl_range.right, file_segment->getInfoForLog());
file_offset_of_buffer_end,
download_offset,
read_buffer_for_file_segment->getPosition(),
impl_range.left,
*impl_range.right,
file_segment->getInfoForLog());
}
break;
@ -397,8 +404,12 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
[[maybe_unused]] const auto & range = file_segment->range();
assert(file_offset_of_buffer_end > range.right);
LOG_TEST(log, "Removing file segment: {}, downloader: {}, state: {}",
file_segment->range().toString(), file_segment->getDownloader(), file_segment->state());
LOG_TEST(
log,
"Removing file segment: {}, downloader: {}, state: {}",
file_segment->range().toString(),
file_segment->getDownloader(),
file_segment->state());
/// Do not hold pointer to file segment if it is not needed anymore
/// so can become releasable and can be evicted from cache.
@ -458,11 +469,17 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
nextimpl_working_buffer_offset = implementation_buffer->offset();
auto download_offset = file_segment->getDownloadOffset();
if (download_offset != static_cast<size_t>(implementation_buffer->getPosition()) || download_offset != file_offset_of_buffer_end)
if (download_offset != static_cast<size_t>(implementation_buffer->getPosition())
|| download_offset != file_offset_of_buffer_end)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Buffer's offsets mismatch after predownloading; download offset: {}, cached buffer offset: {}, implementation buffer offset: {}, "
"file segment info: {}", download_offset, file_offset_of_buffer_end, implementation_buffer->getPosition(), file_segment->getInfoForLog());
"Buffer's offsets mismatch after predownloading; download offset: {}, cached buffer offset: {}, implementation "
"buffer offset: {}, "
"file segment info: {}",
download_offset,
file_offset_of_buffer_end,
implementation_buffer->getPosition(),
file_segment->getInfoForLog());
}
break;
@ -516,7 +533,8 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
LOG_TEST(
log, "Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}",
log,
"Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}",
file_offset_of_buffer_end);
break;
@ -560,7 +578,8 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded()
return true;
}
else if (download_offset < file_offset_of_buffer_end)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} >= {} ({})", download_offset, file_offset_of_buffer_end, getInfoForLog());
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Expected {} >= {} ({})", download_offset, file_offset_of_buffer_end, getInfoForLog());
}
if (read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE)
@ -666,8 +685,13 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
auto & file_segment = *current_file_segment_it;
auto current_read_range = file_segment->range();
LOG_TEST(log, "Current segment: {}, downloader: {}, current count: {}, position: {}",
current_read_range.toString(), file_segment->getDownloader(), implementation_buffer->count(), implementation_buffer->getPosition());
LOG_TEST(
log,
"Current segment: {}, downloader: {}, current count: {}, position: {}",
current_read_range.toString(),
file_segment->getDownloader(),
implementation_buffer->count(),
implementation_buffer->getPosition());
assert(current_read_range.left <= file_offset_of_buffer_end);
assert(current_read_range.right >= file_offset_of_buffer_end);
@ -689,7 +713,10 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type), file_segment->getCallerId(), file_segment->getDownloader(), file_segment->state());
toString(read_type),
file_segment->getCallerId(),
file_segment->getDownloader(),
file_segment->state());
if (!result)
{
@ -699,9 +726,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
auto cache_file_size = cache_file_reader->size();
if (cache_file_size == 0)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to read from an empty cache file: {} (just before actual read)",
cache_file_size);
ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {} (just before actual read)", cache_file_size);
}
#endif
@ -719,10 +744,15 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
file_segment->write(needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(), size, file_offset_of_buffer_end);
file_segment->write(
needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(),
size,
file_offset_of_buffer_end);
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(std::next(current_file_segment_it) == file_segments_holder->file_segments.end() || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
assert(
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
}
else
{
@ -734,18 +764,15 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
switch (read_type)
{
case ReadType::CACHED:
{
case ReadType::CACHED: {
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, size);
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
case ReadType::REMOTE_FS_READ_BYPASS_CACHE: {
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
break;
}
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: {
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
break;
@ -774,14 +801,25 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
assert(!file_segment->isDownloader());
LOG_TEST(log,
"Key: {}. Returning with {} bytes, buffer position: {} (offset: {}, predownloaded: {}), "
"buffer available: {}, current range: {}, current offset: {}, file segment state: {}, download offset: {}, read_type: {}, "
"reading until position: {}, started with offset: {}, remaining ranges: {}",
getHexUIntLowercase(cache_key), working_buffer.size(), getPosition(), offset(), needed_to_predownload,
available(), current_read_range.toString(),
file_offset_of_buffer_end, FileSegment::stateToString(file_segment->state()), file_segment->getDownloadOffset(), toString(read_type),
read_until_position, first_offset, file_segments_holder->toString());
LOG_TEST(
log,
"Key: {}. Returning with {} bytes, buffer position: {} (offset: {}, predownloaded: {}), "
"buffer available: {}, current range: {}, current offset: {}, file segment state: {}, download offset: {}, read_type: {}, "
"reading until position: {}, started with offset: {}, remaining ranges: {}",
getHexUIntLowercase(cache_key),
working_buffer.size(),
getPosition(),
offset(),
needed_to_predownload,
available(),
current_read_range.toString(),
file_offset_of_buffer_end,
FileSegment::stateToString(file_segment->state()),
file_segment->getDownloadOffset(),
toString(read_type),
read_until_position,
first_offset,
file_segments_holder->toString());
if (size == 0 && file_offset_of_buffer_end < read_until_position)
{
@ -789,9 +827,13 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFile *>(implementation_buffer.get()))
cache_file_size = cache_file_reader->size();
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Having zero bytes, but range is not finished: file offset: {}, reading until: {}, read type: {}, cache file size: {}",
file_offset_of_buffer_end, read_until_position, toString(read_type), cache_file_size ? std::to_string(*cache_file_size) : "None");
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Having zero bytes, but range is not finished: file offset: {}, reading until: {}, read type: {}, cache file size: {}",
file_offset_of_buffer_end,
read_until_position,
toString(read_type),
cache_file_size ? std::to_string(*cache_file_size) : "None");
}
return result;
@ -800,8 +842,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
off_t CachedReadBufferFromRemoteFS::seek(off_t offset, int whence)
{
if (initialized)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer");
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET allowed");
@ -822,8 +863,8 @@ size_t CachedReadBufferFromRemoteFS::getTotalSizeToRead()
/// On this level should be guaranteed that read size is non-zero.
if (file_offset_of_buffer_end >= read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read boundaries mismatch. Expected {} < {}",
file_offset_of_buffer_end, read_until_position);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Read boundaries mismatch. Expected {} < {}", file_offset_of_buffer_end, read_until_position);
return read_until_position - file_offset_of_buffer_end;
}
@ -866,22 +907,25 @@ String CachedReadBufferFromRemoteFS::getInfoForLog()
if (implementation_buffer)
{
auto read_range = implementation_buffer->getRemainingReadRange();
implementation_buffer_read_range_str = std::to_string(read_range.left) + '-' + (read_range.right ? std::to_string(*read_range.right) : "None");
implementation_buffer_read_range_str
= std::to_string(read_range.left) + '-' + (read_range.right ? std::to_string(*read_range.right) : "None");
}
else
implementation_buffer_read_range_str = "None";
auto current_file_segment_info = current_file_segment_it == file_segments_holder->file_segments.end() ? "None" : (*current_file_segment_it)->getInfoForLog();
auto current_file_segment_info
= current_file_segment_it == file_segments_holder->file_segments.end() ? "None" : (*current_file_segment_it)->getInfoForLog();
return fmt::format("Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, "
"read_type: {}, last caller: {}, file segment info: {}",
remote_fs_object_path,
getHexUIntLowercase(cache_key),
file_offset_of_buffer_end,
implementation_buffer_read_range_str,
toString(read_type),
last_caller_id,
current_file_segment_info);
return fmt::format(
"Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, "
"read_type: {}, last caller: {}, file segment info: {}",
remote_fs_object_path,
getHexUIntLowercase(cache_key),
file_offset_of_buffer_end,
implementation_buffer_read_range_str,
toString(read_type),
last_caller_id,
current_file_segment_info);
}
}

View File

@ -54,8 +54,6 @@ class MMappedFileCache;
struct ReadSettings
{
String current_query_id;
/// Method to use reading from local filesystem.
LocalFSReadMethod local_fs_method = LocalFSReadMethod::pread;
/// Method to use reading from remote filesystem.

View File

@ -3389,7 +3389,6 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
ReadSettings Context::getReadSettings() const
{
ReadSettings res;
res.current_query_id = client_info.current_query_id;
std::string_view read_method_str = settings.local_filesystem_read_method.value;