This commit is contained in:
kssenii 2022-04-07 18:46:46 +02:00
parent 42c5721d9f
commit 5dce2f18b5
24 changed files with 139 additions and 148 deletions

View File

@ -57,7 +57,7 @@ String IFileCache::getPathInLocalCache(const Key & key)
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str; return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
} }
bool IFileCache::shouldBypassCache() bool IFileCache::isReadOnly()
{ {
return !CurrentThread::isInitialized() return !CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext() || !CurrentThread::get().getQueryContext()
@ -708,7 +708,7 @@ bool LRUFileCache::isLastFileSegmentHolder(
return cell->file_segment.use_count() == 2; return cell->file_segment.use_count() == 2;
} }
FileSegmentsHolder LRUFileCache::getAll() FileSegments LRUFileCache::getSnapshot() const
{ {
std::lock_guard cache_lock(mutex); std::lock_guard cache_lock(mutex);
@ -717,10 +717,10 @@ FileSegmentsHolder LRUFileCache::getAll()
for (const auto & [key, cells_by_offset] : files) for (const auto & [key, cells_by_offset] : files)
{ {
for (const auto & [offset, cell] : cells_by_offset) for (const auto & [offset, cell] : cells_by_offset)
file_segments.push_back(cell.file_segment); file_segments.push_back(FileSegment::getSnapshot(cell.file_segment));
} }
return FileSegmentsHolder(std::move(file_segments)); return file_segments;
} }
std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key) std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)

View File

@ -44,7 +44,7 @@ public:
virtual void tryRemoveAll() = 0; virtual void tryRemoveAll() = 0;
static bool shouldBypassCache(); static bool isReadOnly();
/// Cache capacity in bytes. /// Cache capacity in bytes.
size_t capacity() const { return max_size; } size_t capacity() const { return max_size; }
@ -72,10 +72,10 @@ public:
*/ */
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0; virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegmentsHolder getAll() = 0;
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0; virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegments getSnapshot() const = 0;
/// For debug. /// For debug.
virtual String dumpStructure(const Key & key) = 0; virtual String dumpStructure(const Key & key) = 0;
@ -124,7 +124,7 @@ public:
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override; FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder getAll() override; FileSegments getSnapshot() const override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override; FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;

View File

@ -2,7 +2,7 @@
#include <Common/FileCache_fwd.h> #include <Common/FileCache_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; }} namespace Poco { namespace Util { class AbstractConfiguration; } }
namespace DB namespace DB
{ {

View File

@ -73,6 +73,12 @@ size_t FileSegment::getDownloadOffset() const
return range().left + getDownloadedSize(segment_lock); return range().left + getDownloadedSize(segment_lock);
} }
size_t FileSegment::getDownloadedSize() const
{
std::lock_guard segment_lock(mutex);
return getDownloadedSize(segment_lock);
}
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
{ {
if (download_state == State::DOWNLOADED) if (download_state == State::DOWNLOADED)
@ -84,24 +90,15 @@ size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_l
String FileSegment::getCallerId() String FileSegment::getCallerId()
{ {
return getCallerIdImpl(false); return getCallerIdImpl();
} }
String FileSegment::getCallerIdImpl(bool allow_non_strict_checking) String FileSegment::getCallerIdImpl()
{ {
if (IFileCache::shouldBypassCache()) if (!CurrentThread::isInitialized()
{ || !CurrentThread::get().getQueryContext()
/// getCallerId() can be called from completeImpl(), which can be called from complete(). || CurrentThread::getQueryId().size == 0)
/// complete() is called from destructor of CachedReadBufferFromRemoteFS when there is no query id anymore. return "None:" + toString(getThreadId());
/// Allow non strict checking in this case. This works correctly as if getCallerIdImpl() is called from destructor,
/// then we know that caller is not a downloader, because downloader is reset each nextImpl() call either
/// manually or via SCOPE_EXIT.
if (allow_non_strict_checking)
return "None";
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot use cache without query id");
}
return CurrentThread::getQueryId().toString() + ":" + toString(getThreadId()); return CurrentThread::getQueryId().toString() + ":" + toString(getThreadId());
} }
@ -244,15 +241,9 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
{ {
std::lock_guard segment_lock(mutex); std::lock_guard segment_lock(mutex);
auto info = getInfoForLogImpl(segment_lock); wrapWithCacheInfo(e, "while writing into cache", segment_lock);
e.addMessage("while writing into cache, info: " + info);
LOG_ERROR(log, "Failed to write to cache. File segment info: {}", info); setDownloadFailed(segment_lock);
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
cache_writer->finalize();
cache_writer.reset();
cv.notify_all(); cv.notify_all();
@ -265,7 +256,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
void FileSegment::writeInMemory(const char * from, size_t size) void FileSegment::writeInMemory(const char * from, size_t size)
{ {
if (!size) if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed"); throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Attempt to write zero size cache file");
if (availableSize() < size) if (availableSize() < size)
throw Exception( throw Exception(
@ -284,14 +275,13 @@ void FileSegment::writeInMemory(const char * from, size_t size)
{ {
cache_writer->write(from, size); cache_writer->write(from, size);
} }
catch (...) catch (Exception & e)
{ {
LOG_ERROR(log, "Failed to write to cache. File segment info: {}", getInfoForLogImpl(segment_lock)); wrapWithCacheInfo(e, "while writing into cache", segment_lock);
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; setDownloadFailed(segment_lock);
cache_writer->finalize(); cv.notify_all();
cache_writer.reset();
throw; throw;
} }
@ -313,23 +303,23 @@ size_t FileSegment::finalizeWrite()
{ {
cache_writer->next(); cache_writer->next();
} }
catch (...) catch (Exception & e)
{ {
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; wrapWithCacheInfo(e, "while writing into cache", segment_lock);
cache_writer->finalize(); setDownloadFailed(segment_lock);
cache_writer.reset();
cv.notify_all();
throw; throw;
} }
downloaded_size += size; downloaded_size += size;
cache_writer.reset();
downloader_id.clear();
download_state = State::DOWNLOADED;
if (downloaded_size != range().size()) if (downloaded_size != range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} == {}", downloaded_size, range().size()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected downloaded size to equal file segment size ({} == {})", downloaded_size, range().size());
setDownloaded(segment_lock);
return size; return size;
} }
@ -398,6 +388,20 @@ void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */
{ {
download_state = State::DOWNLOADED; download_state = State::DOWNLOADED;
is_downloaded = true; is_downloaded = true;
downloader_id.clear();
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
}
void FileSegment::setDownloadFailed(std::lock_guard<std::mutex> & /* segment_lock */)
{
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
if (cache_writer) if (cache_writer)
{ {
@ -455,7 +459,7 @@ void FileSegment::complete(State state)
} }
catch (...) catch (...)
{ {
if (!downloader_id.empty() && downloader_id == getCallerIdImpl(true)) if (!downloader_id.empty() && downloader_id == getCallerIdImpl())
downloader_id.clear(); downloader_id.clear();
cv.notify_all(); cv.notify_all();
@ -480,7 +484,7 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the /// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
/// downloader or the only owner of the segment. /// downloader or the only owner of the segment.
bool can_update_segment_state = downloader_id == getCallerIdImpl(true) bool can_update_segment_state = downloader_id == getCallerIdImpl()
|| cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); || cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
if (can_update_segment_state) if (can_update_segment_state)
@ -489,11 +493,11 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
try try
{ {
completeImpl(cache_lock, segment_lock, /* allow_non_strict_checking */true); completeImpl(cache_lock, segment_lock);
} }
catch (...) catch (...)
{ {
if (!downloader_id.empty() && downloader_id == getCallerIdImpl(true)) if (!downloader_id.empty() && downloader_id == getCallerIdImpl())
downloader_id.clear(); downloader_id.clear();
cv.notify_all(); cv.notify_all();
@ -503,7 +507,7 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
cv.notify_all(); cv.notify_all();
} }
void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock, bool allow_non_strict_checking) void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{ {
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
@ -539,7 +543,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
} }
} }
if (!downloader_id.empty() && (downloader_id == getCallerIdImpl(allow_non_strict_checking) || is_last_holder)) if (!downloader_id.empty() && (downloader_id == getCallerIdImpl() || is_last_holder))
{ {
LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state)); LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state));
downloader_id.clear(); downloader_id.clear();
@ -566,6 +570,11 @@ String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock
return info.str(); return info.str();
} }
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard<std::mutex> & segment_lock) const
{
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogImpl(segment_lock)));
}
String FileSegment::stateToString(FileSegment::State state) String FileSegment::stateToString(FileSegment::State state)
{ {
switch (state) switch (state)
@ -599,6 +608,22 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0); assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
} }
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
{
auto snapshot = std::make_shared<FileSegment>(
file_segment->offset(),
file_segment->range().size(),
file_segment->key(),
nullptr,
file_segment->state());
snapshot->hits_count = file_segment->getHitsCount();
snapshot->ref_count = file_segment.use_count();
snapshot->downloaded_size = file_segment->getDownloadedSize();
return snapshot;
}
FileSegmentsHolder::~FileSegmentsHolder() FileSegmentsHolder::~FileSegmentsHolder()
{ {
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from /// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from

View File

@ -97,6 +97,11 @@ public:
void write(const char * from, size_t size, size_t offset_); void write(const char * from, size_t size, size_t offset_);
/**
* writeInMemory and finalizeWrite are used together to write a single file with delay.
* Both can be called only once, one after another. Used for writing cache via threadpool
* on wrote operations. TODO: this solution is temporary, until adding a separate cache layer.
*/
void writeInMemory(const char * from, size_t size); void writeInMemory(const char * from, size_t size);
size_t finalizeWrite(); size_t finalizeWrite();
@ -121,18 +126,24 @@ public:
size_t getDownloadOffset() const; size_t getDownloadOffset() const;
size_t getDownloadedSize() const;
void completeBatchAndResetDownloader(); void completeBatchAndResetDownloader();
void complete(State state); void complete(State state);
String getInfoForLog() const; String getInfoForLog() const;
size_t hits() const { return hits_num; } size_t getHitsCount() const { return hits_count; }
void hit() { ++hits_num; } size_t getRefCount() const { return ref_count; }
void incrementHitsCount() { ++hits_count; }
void assertCorrectness() const; void assertCorrectness() const;
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment);
private: private:
size_t availableSize() const { return reserved_size - downloaded_size; } size_t availableSize() const { return reserved_size - downloaded_size; }
@ -141,6 +152,9 @@ private:
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const; void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
void setDownloaded(std::lock_guard<std::mutex> & segment_lock); void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard<std::mutex> & segment_lock) const;
bool lastFileSegmentHolder() const; bool lastFileSegmentHolder() const;
@ -152,9 +166,9 @@ private:
void completeImpl( void completeImpl(
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock, bool allow_non_strict_checking = false); std::lock_guard<std::mutex> & segment_lock);
static String getCallerIdImpl(bool allow_non_strict_checking = false); static String getCallerIdImpl();
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock); void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);
@ -188,7 +202,8 @@ private:
bool detached = false; bool detached = false;
std::atomic<bool> is_downloaded{false}; std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_num = 0; /// cache hits. std::atomic<size_t> hits_count = 0; /// cache hits.
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
}; };
struct FileSegmentsHolder : private boost::noncopyable struct FileSegmentsHolder : private boost::noncopyable

View File

@ -206,6 +206,8 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
return DiskDecorator::writeFile(path, buf_size, mode, settings); return DiskDecorator::writeFile(path, buf_size, mode, settings);
WriteSettings current_settings = settings; WriteSettings current_settings = settings;
/// There are two different cache implementations. Disable second one if the first is enabled.
/// The firts will soon be removed, this disabling is temporary.
current_settings.enable_filesystem_cache_on_write_operations = false; current_settings.enable_filesystem_cache_on_write_operations = false;
LOG_TEST(log, "Write file {} to cache", backQuote(path)); LOG_TEST(log, "Write file {} to cache", backQuote(path));

View File

@ -77,7 +77,7 @@ public:
UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); } UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); } UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); } UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); }
/// Read-only part /// Read-only part

View File

@ -343,9 +343,9 @@ void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeepe
} }
} }
std::vector<String> IDiskRemote::getRemotePaths(const String & path) const std::vector<String> IDiskRemote::getRemotePaths(const String & local_path) const
{ {
auto metadata = readMetadata(path); auto metadata = readMetadata(local_path);
std::vector<String> remote_paths; std::vector<String> remote_paths;
for (const auto & [remote_path, _] : metadata.remote_fs_objects) for (const auto & [remote_path, _] : metadata.remote_fs_objects)
@ -354,16 +354,16 @@ std::vector<String> IDiskRemote::getRemotePaths(const String & path) const
return remote_paths; return remote_paths;
} }
void IDiskRemote::getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) void IDiskRemote::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
{ {
if (metadata_disk->isFile(path)) if (metadata_disk->isFile(local_path))
{ {
paths_map.emplace_back(path, getRemotePaths(path)); paths_map.emplace_back(local_path, getRemotePaths(local_path));
} }
else else
{ {
for (auto it = iterateDirectory(path); it->isValid(); it->next()) for (auto it = iterateDirectory(local_path); it->isValid(); it->next())
IDiskRemote::getRemotePathsRecursive(fs::path(path) / it->name(), paths_map); IDiskRemote::getRemotePathsRecursive(fs::path(local_path) / it->name(), paths_map);
} }
} }

View File

@ -68,9 +68,9 @@ public:
String getCacheBasePath() const final override; String getCacheBasePath() const final override;
std::vector<String> getRemotePaths(const String & path) const final override; std::vector<String> getRemotePaths(const String & local_path) const final override;
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override; void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
/// Methods for working with metadata. For some operations (like hardlink /// Methods for working with metadata. For some operations (like hardlink
/// creation) metadata can be updated concurrently from multiple threads /// creation) metadata can be updated concurrently from multiple threads

View File

@ -389,7 +389,7 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
implementation_buffer = getImplementationBuffer(*current_file_segment_it); implementation_buffer = getImplementationBuffer(*current_file_segment_it);
if (read_type == ReadType::CACHED) if (read_type == ReadType::CACHED)
(*current_file_segment_it)->hit(); (*current_file_segment_it)->incrementHitsCount();
LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString()); LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString());
return true; return true;
@ -573,8 +573,6 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
bool CachedReadBufferFromRemoteFS::nextImplStep() bool CachedReadBufferFromRemoteFS::nextImplStep()
{ {
assertCacheAllowed();
last_caller_id = FileSegment::getCallerId(); last_caller_id = FileSegment::getCallerId();
if (!initialized) if (!initialized)
@ -623,7 +621,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
implementation_buffer = getImplementationBuffer(*current_file_segment_it); implementation_buffer = getImplementationBuffer(*current_file_segment_it);
if (read_type == ReadType::CACHED) if (read_type == ReadType::CACHED)
(*current_file_segment_it)->hit(); (*current_file_segment_it)->incrementHitsCount();
} }
assert(!internal_buffer.empty()); assert(!internal_buffer.empty());
@ -820,12 +818,6 @@ std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset()
return std::nullopt; return std::nullopt;
} }
void CachedReadBufferFromRemoteFS::assertCacheAllowed() const
{
if (IFileCache::shouldBypassCache() && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache used when not allowed");
}
String CachedReadBufferFromRemoteFS::getInfoForLog() String CachedReadBufferFromRemoteFS::getInfoForLog()
{ {
auto implementation_buffer_read_range_str = auto implementation_buffer_read_range_str =

View File

@ -50,8 +50,6 @@ private:
bool nextImplStep(); bool nextImplStep();
void assertCacheAllowed() const;
enum class ReadType enum class ReadType
{ {
CACHED, CACHED,

View File

@ -38,7 +38,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
current_path = path; current_path = path;
auto cache = settings.remote_fs_cache; auto cache = settings.remote_fs_cache;
bool with_cache = cache && settings.enable_filesystem_cache && !IFileCache::shouldBypassCache(); bool with_cache = cache && settings.enable_filesystem_cache;
auto remote_file_reader_creator = [=, this]() auto remote_file_reader_creator = [=, this]()
{ {

View File

@ -1,6 +1,5 @@
#include "ThreadPoolRemoteFSReader.h" #include "ThreadPoolRemoteFSReader.h"
#include <Core/UUID.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -51,25 +50,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
if (CurrentThread::isInitialized()) if (CurrentThread::isInitialized())
query_context = CurrentThread::get().getQueryContext(); query_context = CurrentThread::get().getQueryContext();
if (!query_context)
{
if (!shared_query_context)
{
ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr;
if (global_context)
{
shared_query_context = Context::createCopy(global_context);
shared_query_context->makeQueryContext();
}
}
if (shared_query_context)
{
shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
query_context = shared_query_context;
}
}
auto task = std::make_shared<std::packaged_task<Result()>>([request, running_group, query_context] auto task = std::make_shared<std::packaged_task<Result()>>([request, running_group, query_context]
{ {
ThreadStatus thread_status; ThreadStatus thread_status;

View File

@ -15,7 +15,6 @@ class ThreadPoolRemoteFSReader : public IAsynchronousReader
private: private:
ThreadPool pool; ThreadPool pool;
ContextMutablePtr shared_query_context;
public: public:
ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_); ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_);

View File

@ -230,7 +230,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
ReadSettings disk_read_settings{read_settings}; ReadSettings disk_read_settings{read_settings};
if (cache) if (cache)
{ {
if (IFileCache::shouldBypassCache()) if (IFileCache::isReadOnly())
disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true; disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
disk_read_settings.remote_fs_cache = cache; disk_read_settings.remote_fs_cache = cache;
@ -272,7 +272,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}", LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_name); mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_name);
bool cache_on_insert = fs::path(path).extension() != ".tmp" bool cache_on_write = cache
&& fs::path(path).extension() != ".tmp"
&& write_settings.enable_filesystem_cache_on_write_operations && write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations; && FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations;
@ -285,7 +286,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_upload_part_size_multiply_parts_count_threshold, settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size, settings->s3_max_single_part_upload_size,
std::move(object_metadata), std::move(object_metadata),
buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), blob_name, cache_on_insert ? cache : nullptr); buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), blob_name, cache_on_write ? cache : nullptr);
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count) auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{ {

View File

@ -33,7 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader))); auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, nullptr); schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
return true; return true;
} }

View File

@ -100,28 +100,6 @@ void WriteBufferFromS3::nextImpl()
? CurrentThread::get().getThreadGroup() ? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup(); : MainThreadStatus::getInstance().getThreadGroup();
if (CurrentThread::isInitialized())
query_context = CurrentThread::get().getQueryContext();
if (!query_context)
{
if (!shared_query_context)
{
ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr;
if (global_context)
{
shared_query_context = Context::createCopy(global_context);
shared_query_context->makeQueryContext();
}
}
if (shared_query_context)
{
shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
query_context = shared_query_context;
}
}
if (cacheEnabled()) if (cacheEnabled())
{ {
if (blob_name.empty()) if (blob_name.empty())
@ -132,8 +110,10 @@ void WriteBufferFromS3::nextImpl()
current_download_offset += size; current_download_offset += size;
size_t remaining_size = size; size_t remaining_size = size;
for (const auto & file_segment : file_segments_holder->file_segments) auto & file_segments = file_segments_holder->file_segments;
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end(); ++file_segment_it)
{ {
auto & file_segment = *file_segment_it;
size_t current_size = std::min(file_segment->range().size(), remaining_size); size_t current_size = std::min(file_segment->range().size(), remaining_size);
remaining_size -= current_size; remaining_size -= current_size;
@ -143,6 +123,7 @@ void WriteBufferFromS3::nextImpl()
} }
else else
{ {
file_segments.erase(file_segment_it, file_segments.end());
break; break;
} }
} }
@ -190,7 +171,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
bool WriteBufferFromS3::cacheEnabled() const bool WriteBufferFromS3::cacheEnabled() const
{ {
return cache != nullptr && !IFileCache::shouldBypassCache(); return cache != nullptr;
} }
void WriteBufferFromS3::preFinalize() void WriteBufferFromS3::preFinalize()
@ -317,7 +298,7 @@ void WriteBufferFromS3::writePart()
/// Releasing lock and condvar notification. /// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
}, query_context); });
} }
else else
{ {
@ -454,7 +435,7 @@ void WriteBufferFromS3::makeSinglepartUpload()
/// Releasing lock and condvar notification. /// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
}, query_context); });
} }
else else
{ {

View File

@ -33,7 +33,7 @@ namespace Aws::S3::Model
namespace DB namespace DB
{ {
using ScheduleFunc = std::function<void(std::function<void()>, ContextPtr)>; using ScheduleFunc = std::function<void(std::function<void()>)>;
class WriteBufferFromFile; class WriteBufferFromFile;
/** /**
@ -128,8 +128,6 @@ private:
size_t current_download_offset = 0; size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder; std::optional<FileSegmentsHolder> file_segments_holder;
static void finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> &); static void finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> &);
ContextMutablePtr shared_query_context;
ContextPtr query_context;
}; };
} }

View File

@ -3,6 +3,7 @@
namespace DB namespace DB
{ {
/// Settings to be passed to IDisk::writeFile()
struct WriteSettings struct WriteSettings
{ {
bool enable_filesystem_cache_on_write_operations = false; bool enable_filesystem_cache_on_write_operations = false;

View File

@ -9,19 +9,14 @@ namespace DB
CallbackRunner threadPoolCallbackRunner(ThreadPool & pool) CallbackRunner threadPoolCallbackRunner(ThreadPool & pool)
{ {
return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback, ContextPtr query_context) mutable return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback) mutable
{ {
pool->scheduleOrThrow( pool->scheduleOrThrow(
[&, callback = std::move(callback), thread_group, query_context]() [&, callback = std::move(callback), thread_group]()
{ {
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachTo(thread_group);
std::optional<CurrentThread::QueryScope> query_scope;
if (query_context && !CurrentThread::get().getQueryContext())
query_scope.emplace(query_context);
SCOPE_EXIT_SAFE({ SCOPE_EXIT_SAFE({
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();

View File

@ -7,7 +7,7 @@ namespace DB
{ {
/// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously /// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously
using CallbackRunner = std::function<void(std::function<void()>, ContextPtr)>; using CallbackRunner = std::function<void(std::function<void()>)>;
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()' /// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'
CallbackRunner threadPoolCallbackRunner(ThreadPool & pool); CallbackRunner threadPoolCallbackRunner(ThreadPool & pool);

View File

@ -27,7 +27,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeWriterSettings writer_settings( MergeTreeWriterSettings writer_settings(
global_settings, global_settings,
WriteSettings{}, data_part->storage.getContext()->getWriteSettings(),
storage_settings, storage_settings,
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */false); /* rewrite_primary_key = */false);

View File

@ -22,6 +22,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
{"state", std::make_shared<DataTypeString>()}, {"state", std::make_shared<DataTypeString>()},
{"cache_hits", std::make_shared<DataTypeUInt64>()}, {"cache_hits", std::make_shared<DataTypeUInt64>()},
{"references", std::make_shared<DataTypeUInt64>()}, {"references", std::make_shared<DataTypeUInt64>()},
{"downloaded_size", std::make_shared<DataTypeUInt64>()},
}; };
} }
@ -37,9 +38,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
for (const auto & [cache_base_path, cache_data] : caches) for (const auto & [cache_base_path, cache_data] : caches)
{ {
const auto & cache = cache_data.cache; const auto & cache = cache_data.cache;
auto holder = cache->getAll(); auto file_segments = cache->getSnapshot();
for (const auto & file_segment : holder.file_segments) for (const auto & file_segment : file_segments)
{ {
res_columns[0]->insert(cache_base_path); res_columns[0]->insert(cache_base_path);
res_columns[1]->insert(cache->getPathInLocalCache(file_segment->key(), file_segment->offset())); res_columns[1]->insert(cache->getPathInLocalCache(file_segment->key(), file_segment->offset()));
@ -49,8 +50,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[3]->insert(range.right); res_columns[3]->insert(range.right);
res_columns[4]->insert(range.size()); res_columns[4]->insert(range.size());
res_columns[5]->insert(FileSegment::stateToString(file_segment->state())); res_columns[5]->insert(FileSegment::stateToString(file_segment->state()));
res_columns[6]->insert(file_segment->hits()); res_columns[6]->insert(file_segment->getHitsCount());
res_columns[7]->insert(file_segment.use_count()); res_columns[7]->insert(file_segment->getRefCount());
res_columns[8]->insert(file_segment->getDownloadedSize());
} }
} }
} }

View File

@ -7,12 +7,14 @@ namespace DB
{ {
/** /**
* Usgae example. How to get mapping from local paths to remote paths:
* SELECT * SELECT
* cache_path, * cache_path,
* cache_hits, * cache_hits,
* remote_path, * remote_path,
* local_path, * local_path,
* file_segment_range, * file_segment_range_begin,
* file_segment_range_end,
* size, * size,
* state * state
* FROM * FROM