Rewrite locking in cache - intermediate state

This commit is contained in:
kssenii 2023-01-04 18:56:45 +01:00
parent 1958d1bc20
commit 25f8e8467a
28 changed files with 2120 additions and 1539 deletions

View File

@ -341,10 +341,45 @@ The server successfully detected this situation and will download merged part fr
M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \
M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \
M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \
M(CachedReadBufferGetFileSegments, "Bytes read from filesystem cache") \
M(CachedReadBufferGetCacheReadBuffer, "Bytes read from filesystem cache") \
M(CachedReadBufferGetRemoteReadBuffer, "Bytes read from filesystem cache") \
M(CachedReadBufferPredownload, "Bytes read from filesystem cache") \
M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
M(CachedWriteBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
M(CachedWriteBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
M(CachedReadBufferWaitForReaderInit, "Time") \
M(CachedReadBufferWaitForReaderInit1, "Time") \
M(CachedReadBufferWaitForReaderInit2, "Time") \
M(CachedReadBufferGetImplementationBuffer, "Time") \
M(CachedReadBufferInitialize, "Time") \
M(CachedReadBufferGetOrSet, "Time") \
M(CachedReadBufferRead, "Time") \
M(CachedReadBufferRead1, "Time") \
M(CachedReadBufferRead2, "Time") \
M(CachedReadBufferRead3, "Time") \
M(CachedReadBufferRead4, "Time") \
M(CachedReadBufferRead5, "Time") \
M(CachedReadBufferRead6, "Time") \
M(CachedReadBufferRead7, "Time") \
M(CachedReadBufferRead8, "Time") \
M(CachedReadBufferRead9, "Time") \
M(CachedReadBufferRead10, "Time") \
M(FileCacheGetOrSet, "Time") \
M(FileCacheGetOrSet1, "Time") \
M(FileCacheGetOrSet2, "Time") \
M(FileCacheGetOrSet3, "Time") \
M(FileCacheGetOrSet4, "Time") \
M(FileCacheGetImpl, "Time") \
M(FileCacheGetImpl1, "Time") \
M(FileCacheGetImpl2, "Time") \
M(FileCacheGetImpl3, "Time") \
M(FileCacheGetImpl4, "Time") \
M(FileCacheGetImpl5, "Time") \
M(FileCacheLockKey, "Time") \
M(FileCacheCreateFile1, "Time") \
M(FileCacheCreateFile2, "Time") \
\
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
M(RemoteFSPrefetches, "Number of prefetches made with asynchronous reading from remote filesystem") \
@ -358,6 +393,18 @@ The server successfully detected this situation and will download merged part fr
\
M(ThreadpoolReaderTaskMicroseconds, "Time spent getting the data in asynchronous reading") \
M(ThreadpoolReaderReadBytes, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolSubmit, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolSubmit2, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun2, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun3, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun4, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun5, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun6, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun7, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun8, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun9, "Bytes read from a threadpool task in asynchronous reading") \
M(ThreadpoolRun10, "Bytes read from a threadpool task in asynchronous reading") \
\
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \
M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \
@ -379,6 +426,7 @@ The server successfully detected this situation and will download merged part fr
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
\
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
M(AsynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
\
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
\

File diff suppressed because it is too large Load Diff

View File

@ -62,26 +62,26 @@ public:
};
private:
void initialize(size_t offset, size_t size);
/**
* Return a list of file segments ordered in ascending order. This list represents
* a full contiguous interval (without holes).
*/
FileSegmentsHolderPtr getFileSegments(size_t offset, size_t size) const;
ImplementationBufferPtr getImplementationBuffer(FileSegmentPtr & file_segment);
ImplementationBufferPtr getImplementationBuffer(FileSegment & file_segment);
ImplementationBufferPtr getReadBufferForFileSegment(FileSegmentPtr & file_segment);
ImplementationBufferPtr getReadBufferForFileSegment(FileSegment & file_segment);
ImplementationBufferPtr getCacheReadBuffer(const FileSegment & file_segment) const;
ImplementationBufferPtr getCacheReadBuffer(FileSegment & file_segment) const;
std::optional<size_t> getLastNonDownloadedOffset() const;
ImplementationBufferPtr getRemoteReadBuffer(FileSegment & file_segment, ReadType read_type_);
bool updateImplementationBufferIfNeeded();
void predownload(FileSegmentPtr & file_segment);
void predownload(FileSegment & file_segment);
bool nextImplStep();
void assertCorrectness() const;
std::shared_ptr<ReadBufferFromFileBase> getRemoteFSReadBuffer(FileSegment & file_segment, ReadType read_type_);
size_t getTotalSizeToRead();
bool completeFileSegmentAndGetNext();
@ -108,8 +108,7 @@ private:
/// Remote read buffer, which can only be owned by current buffer.
FileSegment::RemoteFileReaderPtr remote_file_reader;
std::optional<FileSegmentsHolder> file_segments_holder;
FileSegments::iterator current_file_segment_it;
FileSegmentsHolderPtr file_segments;
ImplementationBufferPtr implementation_buffer;
bool initialized = false;
@ -131,8 +130,6 @@ private:
}
size_t first_offset = 0;
String nextimpl_step_log_info;
String last_caller_id;
String query_id;
bool enable_logging = false;

View File

@ -64,18 +64,20 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
offset, expected_write_offset);
}
auto & file_segments = file_segments_holder.file_segments;
FileSegment * file_segment;
if (file_segments.empty() || file_segments.back()->isDownloaded())
if (file_segments.empty() || file_segments.back().isDownloaded())
{
allocateFileSegment(expected_write_offset, is_persistent);
file_segment = &allocateFileSegment(expected_write_offset, is_persistent);
}
else
{
file_segment = &file_segments.back();
}
auto & file_segment = file_segments.back();
SCOPE_EXIT({
if (file_segments.back()->isDownloader())
file_segments.back()->completePartAndResetDownloader();
if (file_segments.back().isDownloader())
file_segments.back().completePartAndResetDownloader();
});
while (size > 0)
@ -84,7 +86,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
if (available_size == 0)
{
completeFileSegment(*file_segment);
file_segment = allocateFileSegment(expected_write_offset, is_persistent);
file_segment = &allocateFileSegment(expected_write_offset, is_persistent);
continue;
}
@ -100,7 +102,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
bool reserved = file_segment->reserve(size_to_write);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
file_segment->setBroken();
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(
@ -127,11 +129,10 @@ void FileSegmentRangeWriter::finalize()
if (finalized)
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty())
return;
completeFileSegment(*file_segments.back());
completeFileSegment(file_segments.back());
finalized = true;
}
@ -148,15 +149,13 @@ FileSegmentRangeWriter::~FileSegmentRangeWriter()
}
}
FileSegmentPtr & FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
* File segment capacity will equal `max_file_segment_size`, but actual size is 0.
*/
std::lock_guard cache_lock(cache->mutex);
CreateFileSegmentSettings create_settings
{
.is_persistent = is_persistent,
@ -165,10 +164,9 @@ FileSegmentPtr & FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool
/// We set max_file_segment_size to be downloaded,
/// if we have less size to write, file segment will be resized in complete() method.
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, create_settings, cache_lock);
key, offset, cache->max_file_segment_size, create_settings);
auto & file_segments = file_segments_holder.file_segments;
return *file_segments.insert(file_segments.end(), file_segment);
return file_segments.add(std::move(file_segment));
}
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
@ -202,7 +200,7 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
if (file_segment.isDetached() || file_segment.isCompleted())
return;
file_segment.completeWithoutState();
file_segment.complete();
appendFilesystemCacheLog(file_segment);
}

View File

@ -39,7 +39,7 @@ public:
~FileSegmentRangeWriter();
private:
FileSegmentPtr & allocateFileSegment(size_t offset, bool is_persistent);
FileSegment & allocateFileSegment(size_t offset, bool is_persistent);
void appendFilesystemCacheLog(const FileSegment & file_segment);
@ -53,7 +53,7 @@ private:
String query_id;
String source_path;
FileSegmentsHolder file_segments_holder{};
FileSegmentsHolder file_segments{};
size_t expected_write_offset = 0;

View File

@ -0,0 +1,41 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
namespace DB
{
struct ElapsedMSProfileEventIncrement
{
explicit ElapsedMSProfileEventIncrement(ProfileEvents::Event event_) : event(event_), watch((CLOCK_MONOTONIC)) {}
~ElapsedMSProfileEventIncrement()
{
watch.stop();
ProfileEvents::increment(event, watch.elapsedMilliseconds());
}
ProfileEvents::Event event;
Stopwatch watch;
};
struct ElapsedUSProfileEventIncrement
{
explicit ElapsedUSProfileEventIncrement(ProfileEvents::Event event_, bool cancel_ = false)
: event(event_), watch((CLOCK_MONOTONIC)), cancel(cancel_) {}
~ElapsedUSProfileEventIncrement()
{
if (!cancel)
{
watch.stop();
ProfileEvents::increment(event, watch.elapsedMicroseconds());
}
}
ProfileEvents::Event event;
Stopwatch watch;
bool cancel;
};
}

View File

@ -3,6 +3,7 @@
#include <IO/SeekableReadBuffer.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Common/logger_useful.h>
#include <iostream>
#include <Common/hex.h>
@ -35,7 +36,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
&& (CachedObjectStorage::canUseReadThroughCache() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
@ -54,7 +55,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
if (with_cache)
{
auto cache_key = settings.remote_fs_cache->hash(path);
auto cache_key = settings.remote_fs_cache->createKeyForPath(path);
return std::make_shared<CachedOnDiskReadBufferFromFile>(
path,
cache_key,

View File

@ -43,7 +43,7 @@ DataSourceDescription CachedObjectStorage::getDataSourceDescription() const
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
{
return cache->hash(path);
return cache->createKeyForPath(path);
}
String CachedObjectStorage::getCachePath(const std::string & path) const
@ -62,7 +62,7 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
if (FileCache::isReadOnly())
if (!canUseReadThroughCache())
modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
return IObjectStorage::patchSettings(modified_settings);
@ -308,4 +308,11 @@ String CachedObjectStorage::getObjectsNamespace() const
return object_storage->getObjectsNamespace();
}
bool CachedObjectStorage::canUseReadThroughCache()
{
return CurrentThread::isInitialized()
&& CurrentThread::get().getQueryContext()
&& !CurrentThread::getQueryId().empty();
}
}

View File

@ -113,6 +113,8 @@ public:
WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override;
static bool canUseReadThroughCache();
private:
FileCache::Key getCacheKey(const std::string & path) const;

View File

@ -108,6 +108,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
else
{
/// No pending request. Do synchronous read.
Stopwatch watch;
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
auto [size, offset] = asyncReadInto(memory.data(), memory.size()).get();
file_offset_of_buffer_end += size;

View File

@ -9,6 +9,8 @@
namespace ProfileEvents
{
extern const Event FileOpen;
extern const Event FileCacheCreateFile1;
extern const Event FileCacheCreateFile2;
}
@ -89,4 +91,24 @@ void ReadBufferFromFile::close()
metric_increment.destroy();
}
ReadBufferFromFilePReadWithDescriptorsCache::ReadBufferFromFilePReadWithDescriptorsCache(
const std::string & file_name_, size_t buf_size, int flags,
char * existing_memory, size_t alignment, std::optional<size_t> file_size_,
OpenedFileCache::OpenedFilePtr file_,
int fd_)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment, file_size_)
, file_name(file_name_)
{
if (file_)
{
file = file_;
fd = fd_;
}
else
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}
}
}

View File

@ -88,13 +88,9 @@ public:
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment, file_size_)
, file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}
std::optional<size_t> file_size_ = std::nullopt,
OpenedFileCache::OpenedFilePtr file_ = nullptr,
int fd = 0);
std::string getFileName() const override
{

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/Guards.h>
namespace DB
@ -28,11 +29,12 @@ namespace DB
/// Different caching algorithms are implemented using IFileCachePriority.
class FileCache : private boost::noncopyable
{
friend class FileSegment;
friend class IFileCachePriority;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
friend struct KeyTransaction;
friend struct KeyTransactionCreator;
friend struct FileSegmentsHolder;
friend class FileSegment;
struct QueryContext;
using QueryContextPtr = std::shared_ptr<QueryContext>;
@ -59,7 +61,7 @@ public:
* As long as pointers to returned file segments are hold
* it is guaranteed that these file segments are not removed from cache.
*/
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
FileSegmentsHolderPtr getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
@ -70,15 +72,15 @@ public:
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
* it's state (and become DOWNLOADED).
*/
FileSegmentsHolder get(const Key & key, size_t offset, size_t size);
FileSegmentsHolderPtr get(const Key & key, size_t offset, size_t size);
/// Remove files by `key`. Removes files which might be used at the moment.
void removeIfExists(const Key & key);
/// Remove files by `key`. Will not remove files which are used at the moment.
void removeIfReleasable();
void removeAllReleasable();
static Key hash(const String & path);
static Key createKeyForPath(const String & path);
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
@ -92,7 +94,7 @@ public:
size_t getFileSegmentsNum() const;
static bool isReadOnly();
static bool readThrowCacheAllowed();
/**
* Create a file segment of exactly requested size with EMPTY state.
@ -104,14 +106,15 @@ public:
const Key & key,
size_t offset,
size_t size,
const CreateFileSegmentSettings & create_settings,
std::lock_guard<std::mutex> & cache_lock);
const CreateFileSegmentSettings & create_settings);
FileSegments getSnapshot() const;
/// For debug.
String dumpStructure(const Key & key);
bool tryReserve(const Key & key, size_t offset, size_t size);
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
struct QueryContextHolder : private boost::noncopyable
@ -130,6 +133,8 @@ public:
QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);
private:
using KeyAndOffset = FileCacheKeyAndOffset;
String cache_base_path;
const size_t max_size;
@ -137,50 +142,18 @@ private:
const size_t max_file_segment_size;
const bool allow_persistent_files;
const size_t enable_cache_hits_threshold;
const bool enable_filesystem_query_cache_limit;
const bool enable_bypass_cache_with_threshold;
const size_t bypass_cache_threshold;
const bool enable_bypass_cache_with_threashold;
const size_t bypass_cache_threashold;
mutable std::mutex mutex;
Poco::Logger * log;
bool is_initialized = false;
std::exception_ptr initialization_exception;
void assertInitialized(std::lock_guard<std::mutex> & cache_lock) const;
bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::unique_lock<std::mutex> & segment_lock);
void remove(
FileSegmentPtr file_segment,
std::lock_guard<std::mutex> & cache_lock);
bool isLastFileSegmentHolder(
const Key & key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::unique_lock<std::mutex> & segment_lock);
void reduceSizeToDownloaded(
const Key & key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::unique_lock<std::mutex> & segment_lock);
struct FileSegmentCell : private boost::noncopyable
{
FileSegmentPtr file_segment;
/// Iterator is put here on first reservation attempt, if successful.
IFileCachePriority::WriteIterator queue_iterator;
IFileCachePriority::Iterator queue_iterator;
/// Pointer to file segment is always hold by the cache itself.
/// Apart from pointer in cache, it can be hold by cache users, when they call
@ -189,54 +162,92 @@ private:
size_t size() const { return file_segment->reserved_size; }
FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell(
FileSegmentPtr file_segment_,
KeyTransaction & key_transaction,
IFileCachePriority & priority_queue);
FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {}
};
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
struct CacheCells : public std::map<size_t, FileSegmentCell>
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
}
const FileSegmentCell * get(size_t offset) const;
FileSegmentCell * get(size_t offset);
const FileSegmentCell * tryGet(size_t offset) const;
FileSegmentCell * tryGet(size_t offset);
std::string toString() const;
bool created_base_directory = false;
};
using CacheCellsPtr = std::shared_ptr<CacheCells>;
mutable CacheGuard cache_guard;
enum class InitializationState
{
NOT_INITIALIZED,
INITIALIZING,
INITIALIZED,
FAILED,
};
InitializationState initialization_state = InitializationState::NOT_INITIALIZED;
mutable std::condition_variable initialization_cv;
std::exception_ptr initialization_exception;
using CachedFiles = std::unordered_map<Key, CacheCellsPtr>;
CachedFiles files;
using KeyPrefix = std::string;
using KeysLocksMap = std::unordered_map<KeyPrefix, KeyPrefixGuardPtr>;
KeysLocksMap keys_locks;
enum class KeyNotFoundPolicy
{
THROW,
CREATE_EMPTY,
RETURN_NULL,
};
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
using FileCacheRecords = std::unordered_map<AccessKeyAndOffset, IFileCachePriority::WriteIterator, KeyAndOffsetHash>;
KeyTransactionPtr createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool assert_initialized = true);
CachedFiles files;
std::unique_ptr<IFileCachePriority> main_priority;
KeyTransactionCreatorPtr getKeyTransactionCreator(const Key & key, KeyTransaction & key_transaction);
FileCacheRecords stash_records;
std::unique_ptr<IFileCachePriority> stash_priority;
size_t max_stash_element_size;
FileCachePriorityPtr main_priority;
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
struct HitsCountStash
{
HitsCountStash(size_t max_stash_queue_size_, size_t cache_hits_threshold_, FileCachePriorityPtr queue_)
: max_stash_queue_size(max_stash_queue_size_)
, cache_hits_threshold(cache_hits_threshold_)
, queue(std::move(queue_)) {}
FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock);
const size_t max_stash_queue_size;
const size_t cache_hits_threshold;
FileSegmentCell * getCell(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
auto lock() const { return queue->lock(); }
FileSegmentCell * addCell(
FileCachePriorityPtr queue;
using Records = std::unordered_map<KeyAndOffset, IFileCachePriority::Iterator, FileCacheKeyAndOffsetHash>;
Records records;
};
mutable HitsCountStash stash;
protected:
void assertCacheCorrectness();
void assertInitializedUnlocked(CacheGuard::Lock & cache_lock) const;
private:
FileSegments getImpl(
const Key & key,
size_t offset,
size_t size,
FileSegment::State state,
const CreateFileSegmentSettings & create_settings,
std::lock_guard<std::mutex> & cache_lock);
static void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
bool tryReserveForMainList(
const Key & key,
size_t offset,
size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
const FileSegment::Range & range,
const KeyTransaction & key_transaction) const;
FileSegments splitRangeIntoCells(
const Key & key,
@ -244,9 +255,7 @@ private:
size_t size,
FileSegment::State state,
const CreateFileSegmentSettings & create_settings,
std::lock_guard<std::mutex> & cache_lock);
String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
KeyTransaction & key_transaction);
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments,
@ -254,23 +263,50 @@ private:
const FileSegment::Range & range,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings,
std::lock_guard<std::mutex> & cache_lock);
KeyTransaction & key_transaction);
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
void loadCacheInfoIntoMemory();
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
CacheCells::iterator addCell(
const Key & key,
size_t offset,
size_t size,
FileSegment::State state,
const CreateFileSegmentSettings & create_settings,
KeyTransaction & key_transaction);
bool tryReserveUnlocked(
const Key & key,
size_t offset,
size_t size,
KeyTransaction & key_transaction);
bool tryReserveInCache(
const Key & key,
size_t offset,
size_t size,
QueryContextPtr query_context,
KeyTransaction & key_transaction);
bool tryReserveInQueryCache(
const Key & key,
size_t offset,
size_t size,
QueryContextPtr query_context,
KeyTransaction & key_transaction);
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
void removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock & lock) const;
void removeKeyDirectoryIfExists(const Key & key, std::lock_guard<std::mutex> & cache_lock) const;
String dumpStructureUnlocked(const Key & key_, const CacheGuard::Lock & lock);
/// Used to track and control the cache access of each query.
/// Through it, we can realize the processing of different queries by the cache layer.
struct QueryContext
{
FileCacheRecords records;
std::mutex mutex;
HitsCountStash::Records records;
FileCachePriorityPtr priority;
size_t cache_size = 0;
@ -286,34 +322,82 @@ private:
size_t getCacheSize() const { return cache_size; }
FileCachePriorityPtr getPriority() const { return priority; }
IFileCachePriority & getPriority() const { return *priority; }
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction);
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void reserve(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction);
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
void use(const Key & key, size_t offset, KeyTransaction & key_transaction);
};
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
QueryContextMap query_map;
std::mutex query_context_mutex;
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getCurrentQueryContext();
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getQueryContextUnlocked(const String & query_id, std::lock_guard<std::mutex> &);
void removeQueryContext(const String & query_id);
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings);
};
struct KeyTransaction;
using KeyTransactionPtr = std::unique_ptr<KeyTransaction>;
struct KeyTransactionCreator
{
KeyTransactionCreator(
KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
: guard(guard_) , offsets(offsets_) {}
KeyTransactionPtr create();
KeyPrefixGuardPtr guard;
FileCache::CacheCellsPtr offsets;
};
using KeyTransactionCreatorPtr = std::unique_ptr<KeyTransactionCreator>;
struct KeyTransaction : private boost::noncopyable
{
using Key = FileCacheKey;
KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_, std::shared_ptr<CachePriorityQueueGuard::Lock> queue_lock_ = nullptr);
KeyTransactionCreatorPtr getCreator() { return std::make_unique<KeyTransactionCreator>(guard, offsets); }
void remove(FileSegmentPtr file_segment);
void reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock &);
void remove(const Key & key, size_t offset, const FileSegmentGuard::Lock &);
FileCache::CacheCells & getOffsets() { return *offsets; }
const FileCache::CacheCells & getOffsets() const { return *offsets; }
std::vector<size_t> delete_offsets;
const CachePriorityQueueGuard::Lock & getQueueLock() const
{
if (!queue_lock)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Queue is not locked");
return *queue_lock;
}
private:
KeyPrefixGuardPtr guard;
const KeyPrefixGuard::Lock lock;
FileCache::CacheCellsPtr offsets;
Poco::Logger * log;
public:
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);
std::shared_ptr<CachePriorityQueueGuard::Lock> queue_lock;
void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);
void assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lock);
};
}

View File

@ -0,0 +1,27 @@
#include "FileCacheKey.h"
#include <Common/hex.h>
#include <Common/SipHash.h>
namespace DB
{
FileCacheKey::FileCacheKey(const std::string & path)
: key(sipHash128(path.data(), path.size()))
, key_prefix(toString().substr(0, 3))
{
}
FileCacheKey::FileCacheKey(const UInt128 & key_)
: key(key_)
, key_prefix(toString().substr(0, 3))
{
}
std::string FileCacheKey::toString() const
{
return getHexUIntLowercase(key);
}
}

View File

@ -1,23 +1,34 @@
#pragma once
#include <Core/Types.h>
#include <Common/hex.h>
namespace DB
{
struct FileCacheKey
{
/// Hash of the path.
UInt128 key;
/// Prefix of the path.
std::string key_prefix;
String toString() const { return getHexUIntLowercase(key); }
std::string toString() const;
FileCacheKey() = default;
explicit FileCacheKey(const std::string & path);
explicit FileCacheKey(const UInt128 & key_) : key(key_) { }
explicit FileCacheKey(const UInt128 & path);
bool operator==(const FileCacheKey & other) const { return key == other.key; }
};
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;
struct FileCacheKeyAndOffsetHash
{
std::size_t operator()(const FileCacheKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
}
};
}
namespace std

View File

@ -26,15 +26,18 @@ FileSegment::FileSegment(
size_t offset_,
size_t size_,
const Key & key_,
KeyTransactionCreatorPtr key_transaction_creator_,
FileCache * cache_,
State download_state_,
const CreateFileSegmentSettings & settings)
: segment_range(offset_, offset_ + size_ - 1)
, download_state(download_state_)
, key_transaction_creator(std::move(key_transaction_creator_))
, file_key(key_)
, file_path(cache_->getPathInLocalCache(key(), offset(), settings.is_persistent))
, cache(cache_)
#ifndef NDEBUG
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", getHexUIntLowercase(key_), range().toString())))
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", key_.toString(), range().toString())))
#else
, log(&Poco::Logger::get("FileSegment"))
#endif
@ -55,11 +58,17 @@ FileSegment::FileSegment(
{
reserved_size = downloaded_size = size_;
is_downloaded = true;
chassert(std::filesystem::file_size(getPathInLocalCache()) == size_);
is_completed = true;
file = OpenedFileCache::instance().get(file_path, -1);
fd = file->getFD();
chassert(std::filesystem::file_size(file_path) == size_);
break;
}
case (State::SKIP_CACHE):
{
is_detached = true;
is_completed = true;
//CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
break;
}
default:
@ -71,18 +80,13 @@ FileSegment::FileSegment(
}
}
String FileSegment::getPathInLocalCache() const
{
return cache->getPathInLocalCache(key(), offset(), isPersistent());
}
FileSegment::State FileSegment::state() const
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
return download_state;
}
void FileSegment::setDownloadState(State state)
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock &)
{
LOG_TEST(log, "Updated state from {} to {}", stateToString(download_state), stateToString(state));
download_state = state;
@ -90,33 +94,33 @@ void FileSegment::setDownloadState(State state)
size_t FileSegment::getFirstNonDownloadedOffset() const
{
std::unique_lock segment_lock(mutex);
return getFirstNonDownloadedOffsetUnlocked(segment_lock);
auto lock = segment_guard.lock();
return getFirstNonDownloadedOffsetUnlocked(lock);
}
size_t FileSegment::getFirstNonDownloadedOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const
size_t FileSegment::getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock & lock) const
{
return range().left + getDownloadedSizeUnlocked(segment_lock);
return range().left + getDownloadedSizeUnlocked(lock);
}
size_t FileSegment::getCurrentWriteOffset() const
{
std::unique_lock segment_lock(mutex);
return getCurrentWriteOffsetUnlocked(segment_lock);
auto lock = segment_guard.lock();
return getCurrentWriteOffsetUnlocked(lock);
}
size_t FileSegment::getCurrentWriteOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const
size_t FileSegment::getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock & lock) const
{
return getFirstNonDownloadedOffsetUnlocked(segment_lock);
return getFirstNonDownloadedOffsetUnlocked(lock);
}
size_t FileSegment::getDownloadedSize() const
{
std::unique_lock segment_lock(mutex);
return getDownloadedSizeUnlocked(segment_lock);
auto lock = segment_guard.lock();
return getDownloadedSizeUnlocked(lock);
}
size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
size_t FileSegment::getDownloadedSizeUnlocked(const FileSegmentGuard::Lock &) const
{
if (download_state == State::DOWNLOADED)
return downloaded_size;
@ -127,7 +131,7 @@ size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /*
bool FileSegment::isDownloaded() const
{
std::lock_guard segment_lock(mutex);
auto lock = segment_guard.lock();
return is_downloaded;
}
@ -143,70 +147,71 @@ String FileSegment::getCallerId()
String FileSegment::getDownloader() const
{
std::unique_lock segment_lock(mutex);
return getDownloaderUnlocked(segment_lock);
auto lock = segment_guard.lock();
return getDownloaderUnlocked(lock);
}
String FileSegment::getDownloaderUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
String FileSegment::getDownloaderUnlocked(const FileSegmentGuard::Lock &) const
{
return downloader_id;
}
String FileSegment::getOrSetDownloader()
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
assertNotDetachedUnlocked(segment_lock);
assertNotDetachedUnlocked(lock);
auto current_downloader = getDownloaderUnlocked(segment_lock);
auto current_downloader = getDownloaderUnlocked(lock);
if (current_downloader.empty())
{
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED;
const auto caller_id = getCallerId();
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED || !caller_id.starts_with("None");
if (!allow_new_downloader)
return "notAllowed:" + stateToString(download_state);
current_downloader = downloader_id = getCallerId();
setDownloadState(State::DOWNLOADING);
current_downloader = downloader_id = caller_id;
setDownloadState(State::DOWNLOADING, lock);
}
return current_downloader;
}
void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] std::unique_lock<std::mutex> & segment_lock)
void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock)
{
assert(isDownloaderUnlocked(segment_lock));
assert(isDownloaderUnlocked(lock));
assert(download_state == State::DOWNLOADING);
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
size_t current_downloaded_size = getDownloadedSizeUnlocked(lock);
/// range().size() can equal 0 in case of write-though cache.
if (current_downloaded_size != 0 && current_downloaded_size == range().size())
setDownloadedUnlocked(segment_lock);
setDownloadedUnlocked(lock);
else
setDownloadState(State::PARTIALLY_DOWNLOADED);
setDownloadState(State::PARTIALLY_DOWNLOADED, lock);
}
void FileSegment::resetDownloader()
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("resetDownloader", segment_lock);
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("resetDownloader", lock);
resetDownloadingStateUnlocked(segment_lock);
resetDownloaderUnlocked(segment_lock);
resetDownloadingStateUnlocked(lock);
resetDownloaderUnlocked(lock);
}
void FileSegment::resetDownloaderUnlocked(std::unique_lock<std::mutex> & /* segment_lock */)
void FileSegment::resetDownloaderUnlocked(const FileSegmentGuard::Lock &)
{
LOG_TEST(log, "Resetting downloader from {}", downloader_id);
downloader_id.clear();
}
void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock & lock) const
{
auto caller = getCallerId();
auto current_downloader = getDownloaderUnlocked(segment_lock);
auto current_downloader = getDownloaderUnlocked(lock);
LOG_TEST(log, "Downloader id: {}, caller id: {}", current_downloader, caller);
if (caller != current_downloader)
@ -221,41 +226,53 @@ void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, std:
bool FileSegment::isDownloader() const
{
std::unique_lock segment_lock(mutex);
return isDownloaderUnlocked(segment_lock);
auto lock = segment_guard.lock();
return isDownloaderUnlocked(lock);
}
bool FileSegment::isDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock) const
bool FileSegment::isDownloaderUnlocked(const FileSegmentGuard::Lock & lock) const
{
return getCallerId() == getDownloaderUnlocked(segment_lock);
return getCallerId() == getDownloaderUnlocked(lock);
}
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
{
std::unique_lock segment_lock(mutex);
assertIsDownloaderUnlocked("getRemoteFileReader", segment_lock);
auto lock = segment_guard.lock();
assertIsDownloaderUnlocked("getRemoteFileReader", lock);
return remote_file_reader;
}
bool FileSegment::isLastHolder(const KeyTransaction & key_transaction) const
{
chassert(!is_detached);
const auto * cell = key_transaction.getOffsets().get(offset());
return cell->file_segment.use_count() == 2;
}
FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
{
std::lock_guard cache_lock(cache->mutex);
std::unique_lock segment_lock(mutex);
if (!is_detached)
auto key_transaction = createKeyTransaction(false);
if (!key_transaction)
{
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
if (!downloader_id.empty() || !is_last_holder)
return nullptr;
assert(isDetached());
return std::move(remote_file_reader);
}
auto segment_lock = segment_guard.lock();
assert(!is_detached);
bool is_last_holder = isLastHolder(*key_transaction);
if (!downloader_id.empty() || !is_last_holder)
return nullptr;
return std::move(remote_file_reader);
}
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
{
std::unique_lock segment_lock(mutex);
assertIsDownloaderUnlocked("setRemoteFileReader", segment_lock);
auto lock = segment_guard.lock();
assertIsDownloaderUnlocked("setRemoteFileReader", lock);
if (remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists");
@ -265,8 +282,8 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
void FileSegment::resetRemoteFileReader()
{
std::unique_lock segment_lock(mutex);
assertIsDownloaderUnlocked("resetRemoteFileReader", segment_lock);
auto lock = segment_guard.lock();
assertIsDownloaderUnlocked("resetRemoteFileReader", lock);
if (!remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist");
@ -280,24 +297,24 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
assertIsDownloaderUnlocked("write", segment_lock);
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("write", lock);
assertNotDetachedUnlocked(lock);
if (download_state != State::DOWNLOADING)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected DOWNLOADING state, got {}", stateToString(download_state));
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(segment_lock);
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(lock);
if (offset != first_non_downloaded_offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
size_t current_downloaded_size = getDownloadedSizeUnlocked(lock);
chassert(reserved_size >= current_downloaded_size);
size_t free_reserved_size = reserved_size - current_downloaded_size;
@ -317,8 +334,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
"Cache writer was finalized (downloaded size: {}, state: {})",
current_downloaded_size, stateToString(download_state));
auto download_path = getPathInLocalCache();
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
cache_writer = std::make_unique<WriteBufferFromFile>(file_path);
}
}
@ -332,15 +348,15 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
downloaded_size += size;
chassert(std::filesystem::file_size(getPathInLocalCache()) == downloaded_size);
chassert(std::filesystem::file_size(file_path) == downloaded_size);
}
catch (Exception & e)
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
wrapWithCacheInfo(e, "while writing into cache", segment_lock);
wrapWithCacheInfo(e, "while writing into cache", lock);
setDownloadFailedUnlocked(segment_lock);
setDownloadFailedUnlocked(lock);
cv.notify_all();
@ -352,7 +368,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
if (is_detached)
throw Exception(
@ -369,15 +385,27 @@ FileSegment::State FileSegment::wait()
{
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
chassert(!getDownloaderUnlocked(segment_lock).empty());
chassert(!isDownloaderUnlocked(segment_lock));
chassert(!getDownloaderUnlocked(lock).empty());
chassert(!isDownloaderUnlocked(lock));
cv.wait_for(segment_lock, std::chrono::seconds(60));
cv.wait_for(lock.lock, std::chrono::seconds(60));
}
return download_state;
}
KeyTransactionPtr FileSegment::createKeyTransaction(bool assert_exists) const
{
if (!key_transaction_creator)
{
if (assert_exists)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create key transaction: creator does not exist");
else
return nullptr;
}
return key_transaction_creator->create();
}
bool FileSegment::reserve(size_t size_to_reserve)
{
if (!size_to_reserve)
@ -386,12 +414,14 @@ bool FileSegment::reserve(size_t size_to_reserve)
size_t expected_downloaded_size;
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("reserve", segment_lock);
LOG_TRACE(log, "Try reserve for {}", getInfoForLogUnlocked(lock));
expected_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock);
expected_downloaded_size = getDownloadedSizeUnlocked(lock);
if (expected_downloaded_size + size_to_reserve > range().size())
throw Exception(
@ -413,14 +443,12 @@ bool FileSegment::reserve(size_t size_to_reserve)
bool reserved = already_reserved_size >= size_to_reserve;
if (!reserved)
{
std::lock_guard cache_lock(cache->mutex);
size_to_reserve = size_to_reserve - already_reserved_size;
reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
reserved = cache->tryReserve(key(), offset(), size_to_reserve);
if (reserved)
{
std::lock_guard segment_lock(mutex);
auto lock = segment_guard.lock();
reserved_size += size_to_reserve;
}
}
@ -428,7 +456,25 @@ bool FileSegment::reserve(size_t size_to_reserve)
return reserved;
}
void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::mutex> & segment_lock)
OpenedFileCache::OpenedFilePtr FileSegment::getFile() const
{
auto lock = segment_guard.lock();
if (!is_downloaded)
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is not downloaded");
chassert(file);
return file;
}
int FileSegment::getFD() const
{
auto lock = segment_guard.lock();
if (!is_downloaded)
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is not downloaded");
chassert(file);
return fd;
}
void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock)
{
if (is_downloaded)
return;
@ -443,16 +489,19 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::m
download_state = State::DOWNLOADED;
is_downloaded = true;
assert(getDownloadedSizeUnlocked(segment_lock) > 0);
assert(std::filesystem::file_size(getPathInLocalCache()) > 0);
file = OpenedFileCache::instance().get(file_path, -1);
fd = file->getFD();
assert(getDownloadedSizeUnlocked(lock) > 0);
assert(std::filesystem::file_size(file_path) > 0);
}
void FileSegment::setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segment_lock)
void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
{
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(segment_lock));
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(lock));
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
resetDownloaderUnlocked(segment_lock);
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock);
resetDownloaderUnlocked(lock);
if (cache_writer)
{
@ -464,64 +513,59 @@ void FileSegment::setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segme
void FileSegment::completePartAndResetDownloader()
{
std::unique_lock segment_lock(mutex);
completePartAndResetDownloaderUnlocked(segment_lock);
auto lock = segment_guard.lock();
completePartAndResetDownloaderUnlocked(lock);
}
void FileSegment::completePartAndResetDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock)
void FileSegment::completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & lock)
{
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("completePartAndResetDownloader", segment_lock);
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("completePartAndResetDownloader", lock);
resetDownloadingStateUnlocked(segment_lock);
resetDownloaderUnlocked(segment_lock);
resetDownloadingStateUnlocked(lock);
resetDownloaderUnlocked(lock);
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(segment_lock));
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock));
cv.notify_all();
}
void FileSegment::completeWithState(State state)
void FileSegment::setBroken()
{
std::lock_guard cache_lock(cache->mutex);
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("setBroken", lock);
resetDownloadingStateUnlocked(lock);
resetDownloaderUnlocked(lock);
}
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("complete", segment_lock);
void FileSegment::complete()
{
auto lock = cache->main_priority->lockShared();
auto key_transaction = createKeyTransaction();
key_transaction->queue_lock = lock;
return completeUnlocked(*key_transaction);
}
if (state != State::DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
void FileSegment::completeUnlocked(KeyTransaction & key_transaction)
{
auto segment_lock = segment_guard.lock();
if (is_detached)
{
cv.notify_all();
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
assertDetachedStatus(segment_lock);
return;
}
setDownloadState(state);
completeBasedOnCurrentState(cache_lock, segment_lock);
}
void FileSegment::completeWithoutState()
{
std::lock_guard cache_lock(cache->mutex);
completeWithoutStateUnlocked(cache_lock);
}
void FileSegment::completeWithoutStateUnlocked(std::lock_guard<std::mutex> & cache_lock)
{
std::unique_lock segment_lock(mutex);
completeBasedOnCurrentState(cache_lock, segment_lock);
}
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::unique_lock<std::mutex> & segment_lock)
{
if (is_detached)
if (is_completed)
return;
bool is_downloader = isDownloaderUnlocked(segment_lock);
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
const bool is_downloader = isDownloaderUnlocked(segment_lock);
const bool is_last_holder = isLastHolder(key_transaction);
const size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
auto queue_iter = key_transaction.getOffsets().get(offset())->queue_iterator;
if (queue_iter)
queue_iter->use(key_transaction.getQueueLock());
SCOPE_EXIT({
if (is_downloader)
@ -531,38 +575,26 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
});
LOG_TEST(
log,
"Complete based on current state (is_last_holder: {}, {})",
log, "Complete based on current state (is_last_holder: {}, {})",
is_last_holder, getInfoForLogUnlocked(segment_lock));
if (is_downloader)
{
if (download_state == State::DOWNLOADING) /// != in case of completeWithState
if (download_state == State::DOWNLOADING)
resetDownloadingStateUnlocked(segment_lock);
resetDownloaderUnlocked(segment_lock);
}
if (cache_writer && (is_downloader || is_last_holder))
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
switch (download_state)
{
case State::SKIP_CACHE:
{
if (is_last_holder)
cache->remove(key(), offset(), cache_lock, segment_lock);
break;
}
case State::DOWNLOADED:
{
chassert(getDownloadedSizeUnlocked(segment_lock) == range().size());
chassert(getDownloadedSizeUnlocked(segment_lock) == std::filesystem::file_size(getPathInLocalCache()));
chassert(getDownloadedSizeUnlocked(segment_lock) == std::filesystem::file_size(file_path));
chassert(is_downloaded);
chassert(!cache_writer);
is_completed = true;
break;
}
case State::DOWNLOADING:
@ -580,8 +612,8 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
{
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
setDownloadState(State::SKIP_CACHE);
cache->remove(key(), offset(), cache_lock, segment_lock);
setDownloadState(State::SKIP_CACHE, segment_lock);
key_transaction.remove(key(), offset(), segment_lock);
}
else
{
@ -593,42 +625,51 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, segment_lock);
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
key_transaction.reduceSizeToDownloaded(key(), offset(), segment_lock);
}
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
detachAssumeStateFinalized(segment_lock);
is_completed = true;
}
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state while completing file segment");
}
is_completed = true;
LOG_TEST(log, "Completed file segment: {}", getInfoForLogUnlocked(segment_lock));
}
String FileSegment::getInfoForLog() const
{
std::unique_lock segment_lock(mutex);
return getInfoForLogUnlocked(segment_lock);
auto lock = segment_guard.lock();
return getInfoForLogUnlocked(lock);
}
String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment_lock) const
String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock & lock) const
{
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
info << "key: " << key().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << getDownloadedSizeUnlocked(segment_lock) << ", ";
info << "downloaded size: " << getDownloadedSizeUnlocked(lock) << ", ";
info << "reserved size: " << reserved_size << ", ";
info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", ";
info << "current write offset: " << getCurrentWriteOffsetUnlocked(segment_lock) << ", ";
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", ";
info << "current write offset: " << getCurrentWriteOffsetUnlocked(lock) << ", ";
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(lock) << ", ";
info << "caller id: " << getCallerId() << ", ";
info << "detached: " << is_detached << ", ";
info << "persistent: " << is_persistent;
@ -636,9 +677,9 @@ String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment
return info.str();
}
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, const FileSegmentGuard::Lock & lock) const
{
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(segment_lock)));
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(lock)));
}
String FileSegment::stateToString(FileSegment::State state)
@ -663,169 +704,134 @@ String FileSegment::stateToString(FileSegment::State state)
void FileSegment::assertCorrectness() const
{
std::unique_lock segment_lock(mutex);
assertCorrectnessUnlocked(segment_lock);
auto lock = segment_guard.lock();
assertCorrectnessUnlocked(lock);
}
void FileSegment::assertCorrectnessUnlocked(std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock) const
{
auto current_downloader = getDownloaderUnlocked(segment_lock);
auto current_downloader = getDownloaderUnlocked(lock);
chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING));
chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING));
chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(file_path) > 0);
}
void FileSegment::throwIfDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::throwIfDetachedUnlocked(const FileSegmentGuard::Lock & lock) const
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogUnlocked(segment_lock));
"Please, retry. File segment info: {}", getInfoForLogUnlocked(lock));
}
void FileSegment::assertNotDetached() const
{
std::unique_lock segment_lock(mutex);
assertNotDetachedUnlocked(segment_lock);
auto lock = segment_guard.lock();
assertNotDetachedUnlocked(lock);
}
void FileSegment::assertNotDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock) const
{
if (is_detached)
throwIfDetachedUnlocked(segment_lock);
throwIfDetachedUnlocked(lock);
}
void FileSegment::assertDetachedStatus(std::unique_lock<std::mutex> & segment_lock) const
void FileSegment::assertDetachedStatus(const FileSegmentGuard::Lock & lock) const
{
/// Detached file segment is allowed to have only a certain subset of states.
/// It should be either EMPTY or one of the finalized states.
if (download_state != State::EMPTY && !hasFinalizedStateUnlocked(segment_lock))
if (download_state != State::EMPTY
&& download_state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION
&& !hasFinalizedStateUnlocked(lock))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Detached file segment has incorrect state: {}",
getInfoForLogUnlocked(segment_lock));
getInfoForLogUnlocked(lock));
}
}
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
{
std::unique_lock segment_lock(file_segment->mutex);
auto lock = file_segment->segment_guard.lock();
auto snapshot = std::make_shared<FileSegment>(
file_segment->offset(),
file_segment->range().size(),
file_segment->key(),
nullptr,
nullptr,
State::EMPTY,
CreateFileSegmentSettings{});
snapshot->hits_count = file_segment->getHitsCount();
snapshot->ref_count = file_segment.use_count();
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(lock);
snapshot->download_state = file_segment->download_state;
snapshot->is_persistent = file_segment->isPersistent();
return snapshot;
}
bool FileSegment::hasFinalizedStateUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
bool FileSegment::hasFinalizedStateUnlocked(const FileSegmentGuard::Lock &) const
{
return download_state == State::DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION
|| download_state == State::SKIP_CACHE;
}
bool FileSegment::isDetached() const
{
std::unique_lock segment_lock(mutex);
auto lock = segment_guard.lock();
return is_detached;
}
bool FileSegment::isCompleted() const
{
std::unique_lock segment_lock(mutex);
return is_completed;
}
void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::unique_lock<std::mutex> & segment_lock)
void FileSegment::detach(const FileSegmentGuard::Lock & lock, const KeyTransaction &)
{
if (is_detached)
return;
if (download_state == State::DOWNLOADING)
resetDownloadingStateUnlocked(segment_lock);
resetDownloadingStateUnlocked(lock);
else
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock);
resetDownloaderUnlocked(segment_lock);
detachAssumeStateFinalized(segment_lock);
key_transaction_creator = nullptr;
resetDownloaderUnlocked(lock);
detachAssumeStateFinalized(lock);
}
void FileSegment::detachAssumeStateFinalized(std::unique_lock<std::mutex> & segment_lock)
void FileSegment::detachAssumeStateFinalized(const FileSegmentGuard::Lock & lock)
{
is_detached = true;
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(segment_lock));
is_completed = true;
//CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(lock));
}
FileSegment::~FileSegment()
FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
{
std::unique_lock segment_lock(mutex);
if (is_detached)
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
if (file_segments.front()->isCompleted())
{
return file_segments.erase(file_segments.begin());
}
auto lock = file_segments.front()->cache->main_priority->lockShared();
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
auto key_transaction = file_segments.front()->createKeyTransaction(/* assert_exists */false);
key_transaction->queue_lock = lock;
if (key_transaction)
file_segments.front()->completeUnlocked(*key_transaction);
return file_segments.erase(file_segments.begin());
}
FileSegmentsHolder::~FileSegmentsHolder()
{
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
/// remain only uncompleted file segments.
FileCache * cache = nullptr;
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
{
auto current_file_segment_it = file_segment_it;
auto & file_segment = *current_file_segment_it;
if (!cache)
cache = file_segment->cache;
try
{
bool is_detached = false;
{
std::unique_lock segment_lock(file_segment->mutex);
is_detached = file_segment->isDetached(segment_lock);
if (is_detached)
file_segment->assertDetachedStatus(segment_lock);
}
if (is_detached)
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
file_segment_it = file_segments.erase(current_file_segment_it);
continue;
}
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
file_segment->completeWithoutStateUnlocked(cache_lock);
file_segment_it = file_segments.erase(current_file_segment_it);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
file_segment_it = completeAndPopFrontImpl();
}
String FileSegmentsHolder::toString()

View File

@ -2,11 +2,13 @@
#include <boost/noncopyable.hpp>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/Guards.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <IO/OpenedFileCache.h>
#include <base/getThreadId.h>
#include <list>
#include <queue>
@ -28,7 +30,10 @@ class ReadBufferFromFileBase;
class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
struct KeyTransaction;
using KeyTransactionPtr = std::unique_ptr<KeyTransaction>;
struct KeyTransactionCreator;
using KeyTransactionCreatorPtr = std::unique_ptr<KeyTransactionCreator>;
struct CreateFileSegmentSettings
{
@ -42,6 +47,7 @@ friend class FileCache;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
friend class StorageSystemFilesystemCache;
friend struct KeyTransaction;
public:
using Key = FileCacheKey;
@ -88,12 +94,11 @@ public:
size_t offset_,
size_t size_,
const Key & key_,
KeyTransactionCreatorPtr key_transaction_creator,
FileCache * cache_,
State download_state_,
const CreateFileSegmentSettings & create_settings);
~FileSegment();
State state() const;
static String stateToString(FileSegment::State state);
@ -132,7 +137,7 @@ public:
using UniqueId = std::pair<FileCacheKey, size_t>;
UniqueId getUniqueId() const { return std::pair(key(), offset()); }
String getPathInLocalCache() const;
String getPathInLocalCache() const { return file_path; }
/**
* ========== Methods for _any_ file segment's owner ========================
@ -161,6 +166,12 @@ public:
size_t getDownloadedSize() const;
OpenedFileCache::OpenedFilePtr getFile() const;
int getFD() const;
OpenedFileCache::OpenedFilePtr file;
int fd;
/// Now detached status can be used in the following cases:
/// 1. there is only 1 remaining file segment holder
/// && it does not need this segment anymore
@ -175,13 +186,13 @@ public:
/// 2. Detached file segment can still be hold by some cache users, but it's state became
/// immutable at the point it was detached, any non-const / stateful method will throw an
/// exception.
void detach(std::lock_guard<std::mutex> & cache_lock, std::unique_lock<std::mutex> & segment_lock);
void detach(const FileSegmentGuard::Lock &, const KeyTransaction &);
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & cache_lock);
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment);
bool isDetached() const;
bool isCompleted() const;
bool isCompleted() const { return is_completed; }
void assertCorrectness() const;
@ -201,10 +212,9 @@ public:
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);
/// Complete file segment with a certain state.
void completeWithState(State state);
void setBroken();
void completeWithoutState();
void complete();
/// Complete file segment's part which was last written.
void completePartAndResetDownloader();
@ -219,46 +229,53 @@ public:
void resetRemoteFileReader();
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
private:
size_t getFirstNonDownloadedOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
size_t getCurrentWriteOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
size_t getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
size_t getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock & lock) const;
size_t getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock & lock) const;
size_t getDownloadedSizeUnlocked(const FileSegmentGuard::Lock & lock) const;
String getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
String getInfoForLogUnlocked(const FileSegmentGuard::Lock & lock) const;
String getDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
void resetDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock);
void resetDownloadingStateUnlocked(std::unique_lock<std::mutex> & segment_lock);
String getDownloaderUnlocked(const FileSegmentGuard::Lock & lock) const;
void resetDownloaderUnlocked(const FileSegmentGuard::Lock & lock);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock & lock);
void setDownloadState(State state);
void setDownloadState(State state, const FileSegmentGuard::Lock & lock);
void setDownloadedUnlocked(std::unique_lock<std::mutex> & segment_lock);
void setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segment_lock);
void setDownloadedUnlocked(const FileSegmentGuard::Lock & lock);
void setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock);
bool hasFinalizedStateUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
/// Finalized state is such a state that does not need to be completed (with complete()).
bool hasFinalizedStateUnlocked(const FileSegmentGuard::Lock & lock) const;
bool isDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
bool isDetached(std::unique_lock<std::mutex> & /* segment_lock */) const { return is_detached; }
void detachAssumeStateFinalized(std::unique_lock<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
bool isDetached(const FileSegmentGuard::Lock &) const { return is_detached; }
void detachAssumeStateFinalized(const FileSegmentGuard::Lock & lock);
[[noreturn]] void throwIfDetachedUnlocked(const FileSegmentGuard::Lock & lock) const;
void assertDetachedStatus(std::unique_lock<std::mutex> & segment_lock) const;
void assertDetachedStatus(const FileSegmentGuard::Lock & lock) const;
void assertNotDetached() const;
void assertNotDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
void assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock<std::mutex> & segment_lock) const;
void assertCorrectnessUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
void assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock) const;
void assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock & lock) const;
void assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock) const;
KeyTransactionPtr createKeyTransaction(bool assert_exists = true) const;
/// complete() without any completion state is called from destructor of
/// FileSegmentsHolder. complete() might check if the caller of the method
/// is the last alive holder of the segment. Therefore, complete() and destruction
/// of the file segment pointer must be done under the same cache mutex.
void completeWithoutStateUnlocked(std::lock_guard<std::mutex> & cache_lock);
void completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::unique_lock<std::mutex> & segment_lock);
void completeUnlocked(KeyTransaction & key_transaction);
void completePartAndResetDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock);
void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock);
void wrapWithCacheInfo(Exception & e, const String & message, std::unique_lock<std::mutex> & segment_lock) const;
void wrapWithCacheInfo(
Exception & e, const String & message, const FileSegmentGuard::Lock & segment_lock) const;
bool isLastHolder(const KeyTransaction & key_transaction) const;
Range segment_range;
@ -277,7 +294,8 @@ private:
/// 1. cache lock
/// 2. segment lock
mutable std::mutex mutex;
mutable FileSegmentGuard segment_guard;
KeyTransactionCreatorPtr key_transaction_creator;
std::condition_variable cv;
/// Protects downloaded_size access with actual write into fs.
@ -289,6 +307,7 @@ private:
mutable std::mutex download_mutex;
Key file_key;
const std::string file_path;
FileCache * cache;
Poco::Logger * log;
@ -296,7 +315,7 @@ private:
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool is_detached = false;
bool is_completed = false;
std::atomic<bool> is_completed = false;
bool is_downloaded{false};
@ -314,13 +333,35 @@ struct FileSegmentsHolder : private boost::noncopyable
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {}
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
~FileSegmentsHolder();
bool empty() const { return file_segments.empty(); }
size_t size() const { return file_segments.size(); }
String toString();
void popFront() { completeAndPopFrontImpl(); }
FileSegment & front() { return *file_segments.front(); }
FileSegment & back() { return *file_segments.back(); }
FileSegment & add(FileSegmentPtr && file_segment)
{
file_segments.push_back(file_segment);
return *file_segments.back();
}
FileSegments::iterator begin() { return file_segments.begin(); }
FileSegments::iterator end() { return file_segments.end(); }
private:
FileSegments file_segments{};
FileSegments::iterator completeAndPopFrontImpl();
};
using FileSegmentsHolderPtr = std::unique_ptr<FileSegmentsHolder>;
}

View File

@ -0,0 +1,79 @@
#pragma once
#include <mutex>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <boost/noncopyable.hpp>
#include <map>
namespace DB
{
/**
* Guard for the whole cache object.
*/
struct CacheGuard
{
struct Lock
{
explicit Lock(CacheGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock;
};
std::mutex mutex;
Lock lock() { return Lock(*this); }
};
/**
* Guard for a set of keys.
* One guard per key prefix (first three digits of the path hash).
*/
struct KeyPrefixGuard
{
struct Lock
{
explicit Lock(KeyPrefixGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock;
};
std::mutex mutex;
Lock lock() { return Lock(*this); }
KeyPrefixGuard() = default;
};
using KeyPrefixGuardPtr = std::shared_ptr<KeyPrefixGuard>;
struct CachePriorityQueueGuard
{
struct Lock
{
explicit Lock(CachePriorityQueueGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock;
};
std::mutex mutex;
Lock lock() { return Lock(*this); }
std::shared_ptr<Lock> lockShared() { return std::make_shared<Lock>(*this); }
CachePriorityQueueGuard() = default;
};
/**
* Guard for a file segment.
* Cache guard > key prefix guard > file segment guard.
*/
struct FileSegmentGuard
{
struct Lock
{
explicit Lock(FileSegmentGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock;
};
std::mutex mutex;
Lock lock() { return Lock(*this); }
};
}

View File

@ -5,12 +5,17 @@
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/Guards.h>
namespace DB
{
class IFileCachePriority;
using FileCachePriorityPtr = std::shared_ptr<IFileCachePriority>;
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
struct KeyTransaction;
using KeyTransactionPtr = std::unique_ptr<KeyTransaction>;
struct KeyTransactionCreator;
using KeyTransactionCreatorPtr = std::unique_ptr<KeyTransactionCreator>;
/// IFileCachePriority is used to maintain the priority of cached data.
class IFileCachePriority
@ -18,8 +23,10 @@ class IFileCachePriority
public:
class IIterator;
using Key = FileCacheKey;
using ReadIterator = std::unique_ptr<const IIterator>;
using WriteIterator = std::shared_ptr<IIterator>;
using KeyAndOffset = FileCacheKeyAndOffset;
using Iterator = std::shared_ptr<IIterator>;
using ConstIterator = std::shared_ptr<const IIterator>;
using Lock = CachePriorityQueueGuard::Lock;
struct FileCacheRecord
{
@ -27,8 +34,11 @@ public:
size_t offset;
size_t size;
size_t hits = 0;
KeyTransactionCreatorPtr key_transaction_creator;
FileCacheRecord(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) { }
FileCacheRecord(
const Key & key_, size_t offset_, size_t size_, KeyTransactionCreatorPtr key_transaction_creator_)
: key(key_), offset(offset_), size(size_), key_transaction_creator(std::move(key_transaction_creator_)) { }
};
/// It provides an iterator to traverse the cache priority. Under normal circumstances,
@ -48,48 +58,59 @@ public:
virtual size_t hits() const = 0;
/// Point the iterator to the next higher priority cache record.
virtual void next() const = 0;
virtual KeyTransactionPtr createKeyTransaction(const CachePriorityQueueGuard::Lock &) = 0;
virtual bool valid() const = 0;
/// Point the iterator to the next higher priority cache record.
virtual void next(const CachePriorityQueueGuard::Lock &) const = 0;
virtual bool valid(const CachePriorityQueueGuard::Lock &) const = 0;
/// Mark a cache record as recently used, it will update the priority
/// of the cache record according to different cache algorithms.
virtual void use(std::lock_guard<std::mutex> &) = 0;
/// Return result hits count.
virtual size_t use(const CachePriorityQueueGuard::Lock &) = 0;
/// Deletes an existing cached record. And to avoid pointer suspension
/// the iterator should automatically point to the next record.
virtual void removeAndGetNext(std::lock_guard<std::mutex> &) = 0;
/// Deletes an existing cached record. Return iterator to the next value.
virtual Iterator remove(const CachePriorityQueueGuard::Lock &) = 0;
virtual void incrementSize(size_t, std::lock_guard<std::mutex> &) = 0;
virtual void incrementSize(ssize_t, const CachePriorityQueueGuard::Lock &) = 0;
};
public:
virtual ~IFileCachePriority() = default;
/// Lock current priority queue. All methods must be called under this lock.
CachePriorityQueueGuard::Lock lock() { return guard.lock(); }
std::shared_ptr<CachePriorityQueueGuard::Lock> lockShared() { return guard.lockShared(); }
/// Add a cache record that did not exist before, and throw a
/// logical exception if the cache block already exists.
virtual WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) = 0;
virtual Iterator add(
const Key & key,
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &) = 0;
/// This method is used for assertions in debug mode. So we do not care about complexity here.
/// Query whether a cache record exists. If it exists, return true. If not, return false.
virtual bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) = 0;
virtual bool contains(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &) = 0;
virtual void removeAll(std::lock_guard<std::mutex> & cache_lock) = 0;
/// Returns an iterator pointing to the lowest priority cached record.
/// We can traverse all cached records through the iterator's next().
virtual ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
virtual void removeAll(const CachePriorityQueueGuard::Lock &) = 0;
/// The same as getLowestPriorityReadIterator(), but it is writeable.
virtual WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
virtual Iterator getLowestPriorityIterator(const CachePriorityQueueGuard::Lock &) = 0;
virtual size_t getElementsNum(std::lock_guard<std::mutex> & cache_lock) const = 0;
virtual size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const = 0;
size_t getCacheSize(std::lock_guard<std::mutex> &) const { return cache_size; }
size_t getCacheSize(const CachePriorityQueueGuard::Lock &) const { return cache_size; }
protected:
CachePriorityQueueGuard guard;
size_t max_cache_size = 0;
size_t cache_size = 0;
};
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
};

View File

@ -1,4 +1,5 @@
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/FileCache.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
@ -15,7 +16,12 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> &)
IFileCachePriority::Iterator LRUFileCachePriority::add(
const Key & key,
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &)
{
#ifndef NDEBUG
for (const auto & entry : queue)
@ -28,7 +34,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
}
#endif
auto iter = queue.insert(queue.end(), FileCacheRecord(key, offset, size));
auto iter = queue.insert(queue.end(), FileCacheRecord(key, offset, size, std::move(key_transaction_creator)));
cache_size += size;
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
@ -39,7 +45,12 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
return std::make_shared<LRUFileCacheIterator>(this, iter);
}
bool LRUFileCachePriority::contains(const Key & key, size_t offset, std::lock_guard<std::mutex> &)
KeyTransactionPtr LRUFileCachePriority::LRUFileCacheIterator::createKeyTransaction(const CachePriorityQueueGuard::Lock &)
{
return queue_iter->key_transaction_creator->create();
}
bool LRUFileCachePriority::contains(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &)
{
for (const auto & record : queue)
{
@ -49,7 +60,7 @@ bool LRUFileCachePriority::contains(const Key & key, size_t offset, std::lock_gu
return false;
}
void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
void LRUFileCachePriority::removeAll(const CachePriorityQueueGuard::Lock &)
{
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
@ -60,28 +71,31 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
cache_size = 0;
}
// LRUFileCachePriority::KeyAndOffset LRUFileCachePriority::pop(const CachePriorityQueueGuard::Lock & lock)
// {
// auto remove_it = getLowestPriorityIterator(lock);
// KeyAndOffset result(remove_it->key(), remove_it->offset());
// remove_it->removeAndGetNext(lock);
// return result;
// }
LRUFileCachePriority::LRUFileCacheIterator::LRUFileCacheIterator(
LRUFileCachePriority * cache_priority_, LRUFileCachePriority::LRUQueueIterator queue_iter_)
: cache_priority(cache_priority_), queue_iter(queue_iter_)
{
}
IFileCachePriority::ReadIterator LRUFileCachePriority::getLowestPriorityReadIterator(std::lock_guard<std::mutex> &)
{
return std::make_unique<const LRUFileCacheIterator>(this, queue.begin());
}
IFileCachePriority::WriteIterator LRUFileCachePriority::getLowestPriorityWriteIterator(std::lock_guard<std::mutex> &)
IFileCachePriority::Iterator LRUFileCachePriority::getLowestPriorityIterator(const CachePriorityQueueGuard::Lock &)
{
return std::make_shared<LRUFileCacheIterator>(this, queue.begin());
}
size_t LRUFileCachePriority::getElementsNum(std::lock_guard<std::mutex> &) const
size_t LRUFileCachePriority::getElementsNum(const CachePriorityQueueGuard::Lock &) const
{
return queue.size();
}
void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guard<std::mutex> &)
LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remove(const CachePriorityQueueGuard::Lock &)
{
cache_priority->cache_size -= queue_iter->size;
@ -90,20 +104,22 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
LOG_TRACE(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
queue_iter = cache_priority->queue.erase(queue_iter);
auto next = std::make_shared<LRUFileCacheIterator>(cache_priority, cache_priority->queue.erase(queue_iter));
queue_iter = cache_priority->queue.end();
return next;
}
void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(size_t size_increment, std::lock_guard<std::mutex> &)
void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(ssize_t size_increment, const CachePriorityQueueGuard::Lock &)
{
cache_priority->cache_size += size_increment;
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size_increment);
queue_iter->size += size_increment;
}
void LRUFileCachePriority::LRUFileCacheIterator::use(std::lock_guard<std::mutex> &)
size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CachePriorityQueueGuard::Lock &)
{
queue_iter->hits++;
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, queue_iter);
return ++queue_iter->hits;
}
};

View File

@ -2,6 +2,7 @@
#include <list>
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Common/logger_useful.h>
namespace DB
@ -19,17 +20,20 @@ private:
public:
LRUFileCachePriority() = default;
WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> &) override;
Iterator add(
const Key & key,
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &) override;
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> &) override;
bool contains(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &) override;
void removeAll(std::lock_guard<std::mutex> &) override;
void removeAll(const CachePriorityQueueGuard::Lock &) override;
ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> &) override;
Iterator getLowestPriorityIterator(const CachePriorityQueueGuard::Lock &) override;
WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> &) override;
size_t getElementsNum(std::lock_guard<std::mutex> &) const override;
size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const override;
private:
LRUQueue queue;
@ -39,11 +43,13 @@ private:
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator
{
public:
LRUFileCacheIterator(LRUFileCachePriority * cache_priority_, LRUFileCachePriority::LRUQueueIterator queue_iter_);
LRUFileCacheIterator(
LRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUQueueIterator queue_iter_);
void next() const override { queue_iter++; }
void next(const CachePriorityQueueGuard::Lock &) const override { queue_iter++; }
bool valid() const override { return queue_iter != cache_priority->queue.end(); }
bool valid(const CachePriorityQueueGuard::Lock &) const override { return queue_iter != cache_priority->queue.end(); }
const Key & key() const override { return queue_iter->key; }
@ -53,11 +59,13 @@ public:
size_t hits() const override { return queue_iter->hits; }
void removeAndGetNext(std::lock_guard<std::mutex> &) override;
KeyTransactionPtr createKeyTransaction(const CachePriorityQueueGuard::Lock &) override;
void incrementSize(size_t size_increment, std::lock_guard<std::mutex> &) override;
Iterator remove(const CachePriorityQueueGuard::Lock &) override;
void use(std::lock_guard<std::mutex> &) override;
void incrementSize(ssize_t size_increment, const CachePriorityQueueGuard::Lock &) override;
size_t use(const CachePriorityQueueGuard::Lock &) override;
private:
LRUFileCachePriority * cache_priority;

View File

@ -3628,7 +3628,7 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 100);
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 200);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);

View File

@ -334,12 +334,12 @@ BlockIO InterpreterSystemQuery::execute()
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache_data] : caches)
cache_data->cache->removeIfReleasable();
cache_data->cache->removeAllReleasable();
}
else
{
auto cache = FileCacheFactory::instance().get(query.filesystem_cache_path);
cache->removeIfReleasable();
cache->removeAllReleasable();
}
break;
}

View File

@ -34,6 +34,20 @@ void assertRange(
ASSERT_EQ(range.right, expected_range.right);
ASSERT_EQ(file_segment->state(), expected_state);
}
void assertRange(
[[maybe_unused]] size_t assert_n, DB::FileSegment & file_segment,
const DB::FileSegment::Range & expected_range, DB::FileSegment::State expected_state)
{
auto range = file_segment.range();
std::cerr << fmt::format("\nAssert #{} : {} == {} (state: {} == {})\n", assert_n,
range.toString(), expected_range.toString(),
toString(file_segment.state()), toString(expected_state));
ASSERT_EQ(range.left, expected_range.left);
ASSERT_EQ(range.right, expected_range.right);
ASSERT_EQ(file_segment.state(), expected_state);
}
void printRanges(const auto & segments)
{
@ -42,9 +56,9 @@ void printRanges(const auto & segments)
std::cerr << '\n' << segment->range().toString() << " (state: " + DB::FileSegment::stateToString(segment->state()) + ")" << "\n";
}
std::vector<DB::FileSegmentPtr> fromHolder(const DB::FileSegmentsHolder & holder)
std::vector<DB::FileSegmentPtr> fromHolder(const DB::FileSegmentsHolderPtr & holder)
{
return std::vector<DB::FileSegmentPtr>(holder.file_segments.begin(), holder.file_segments.end());
return std::vector<DB::FileSegmentPtr>(holder->begin(), holder->end());
}
String getFileSegmentPath(const String & base_path, const DB::FileCache::Key & key, size_t offset)
@ -74,13 +88,13 @@ void prepareAndDownload(DB::FileSegmentPtr file_segment)
download(file_segment);
}
void complete(const DB::FileSegmentsHolder & holder)
void complete(DB::FileSegmentsHolderPtr holder)
{
for (const auto & file_segment : holder.file_segments)
for (auto it = holder->begin(); it != holder->end(); ++it)
{
ASSERT_TRUE(file_segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(file_segment);
file_segment->completeWithoutState();
ASSERT_TRUE((*it)->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(*it);
(*it)->complete();
}
}
@ -107,7 +121,7 @@ TEST(FileCache, get)
{
auto cache = DB::FileCache(cache_base_path, settings);
cache.initialize();
auto key = cache.hash("key1");
auto key = cache.createKeyForPath("key1");
{
auto holder = cache.getOrSet(key, 0, 10, {}); /// Add range [0, 9]
@ -127,7 +141,7 @@ TEST(FileCache, get)
assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING);
download(segments[0]);
segments[0]->completeWithoutState();
segments[0]->complete();
assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
}
@ -148,7 +162,7 @@ TEST(FileCache, get)
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[1]);
segments[1]->completeWithoutState();
segments[1]->complete();
assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
@ -205,7 +219,7 @@ TEST(FileCache, get)
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[2]);
segments[2]->completeWithoutState();
segments[2]->complete();
assertRange(14, segments[3], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
@ -246,7 +260,7 @@ TEST(FileCache, get)
ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[3]);
segments[3]->completeWithoutState();
segments[3]->complete();
ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED);
}
@ -269,13 +283,19 @@ TEST(FileCache, get)
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[0]);
prepareAndDownload(segments[2]);
segments[0]->completeWithoutState();
segments[2]->completeWithoutState();
segments[0]->complete();
segments[2]->complete();
}
/// Current cache: [____][_] [][___][__]
/// ^ ^ ^^^ ^^ ^
/// 17 21 2324 26 28
/// 17 21 2324 26 27
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
assertRange(25, cache.get(key, 17, 4)->front(), DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
assertRange(26, cache.get(key, 21, 1)->front(), DB::FileSegment::Range(21, 21), DB::FileSegment::State::DOWNLOADED);
assertRange(27, cache.get(key, 23, 1)->front(), DB::FileSegment::Range(23, 23), DB::FileSegment::State::DOWNLOADED);
assertRange(28, cache.get(key, 24, 3)->front(), DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
assertRange(29, cache.get(key, 27, 1)->front(), DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
{
auto holder5 = cache.getOrSet(key, 2, 3, {}); /// Get [2, 4]
@ -292,8 +312,8 @@ TEST(FileCache, get)
ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(s5[0]);
prepareAndDownload(s1[0]);
s5[0]->completeWithoutState();
s1[0]->completeWithoutState();
s5[0]->complete();
s1[0]->complete();
/// Current cache: [___] [_][___][_] [__]
/// ^ ^ ^ ^ ^ ^ ^ ^
@ -313,6 +333,8 @@ TEST(FileCache, get)
/// All cache is now unreleasable because pointers are still hold
auto holder6 = cache.getOrSet(key, 0, 40, {});
std::cerr << "kssenii: " << holder6->toString() << "\n\n";
std::cerr << "kssenii: " << cache.dumpStructure(key) << "\n\n";
auto f = fromHolder(holder6);
ASSERT_EQ(f.size(), 9);
@ -395,7 +417,7 @@ TEST(FileCache, get)
}
prepareAndDownload(segments[2]);
segments[2]->completeWithoutState();
segments[2]->complete();
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
other_1.join();
@ -410,10 +432,10 @@ TEST(FileCache, get)
/// state is changed not manually via segment->completeWithState(state) but from destructor of holder
/// and notify_all() is also called from destructor of holder.
std::optional<DB::FileSegmentsHolder> holder;
holder.emplace(cache.getOrSet(key, 3, 23, {})); /// Get [3, 25]
DB::FileSegmentsHolderPtr holder;
holder = cache.getOrSet(key, 3, 23, {}); /// Get [3, 25]
auto segments = fromHolder(*holder);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 3);
assertRange(38, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
@ -438,7 +460,7 @@ TEST(FileCache, get)
thread_status_1.attachQueryContext(query_context_1);
auto holder_2 = cache.getOrSet(key, 3, 23, {}); /// Get [3, 25] once again
auto segments_2 = fromHolder(*holder);
auto segments_2 = fromHolder(holder);
ASSERT_EQ(segments_2.size(), 3);
assertRange(41, segments_2[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
@ -460,7 +482,7 @@ TEST(FileCache, get)
ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments_2[1]);
segments_2[1]->completeWithoutState();
segments_2[1]->complete();
});
{
@ -484,7 +506,7 @@ TEST(FileCache, get)
auto cache2 = DB::FileCache(cache_base_path, settings);
cache2.initialize();
auto key = cache2.hash("key1");
auto key = cache2.createKeyForPath("key1");
auto holder1 = cache2.getOrSet(key, 2, 28, {}); /// Get [2, 29]
@ -505,7 +527,7 @@ TEST(FileCache, get)
settings2.max_file_segment_size = 10;
auto cache2 = DB::FileCache(caches_dir / "cache2", settings2);
cache2.initialize();
auto key = cache2.hash("key1");
auto key = cache2.createKeyForPath("key1");
auto holder1 = cache2.getOrSet(key, 0, 25, {}); /// Get [0, 24]
auto segments1 = fromHolder(holder1);

View File

@ -4,8 +4,16 @@
#include <Common/scope_guard_safe.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Disks/IO/ElapsedTimeProfileEventIncrement.h>
#include <future>
namespace ProfileEvents
{
extern const Event ThreadpoolRun;
extern const Event ThreadpoolRun2;
extern const Event ThreadpoolRun3;
}
namespace DB
{
@ -15,11 +23,14 @@ using ThreadPoolCallbackRunner = std::function<std::future<Result>(std::function
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'.
template <typename Result>
ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, std::string && thread_name)
{
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](std::function<Result()> && callback, size_t priority) mutable -> std::future<Result>
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name = std::move(thread_name)]
(std::function<Result()> && callback, size_t priority) mutable -> std::future<Result>
{
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() -> Result
ElapsedUSProfileEventIncrement measure_time(ProfileEvents::ThreadpoolRun);
auto task = std::make_shared<std::packaged_task<Result()>>(
[thread_group, thread_name, callback = std::move(callback)]() -> Result
{
if (thread_group)
CurrentThread::attachTo(thread_group);
@ -37,7 +48,7 @@ ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, con
auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
pool->scheduleOrThrow([task]{ (*task)(); }, -priority);
pool->scheduleOrThrow([task = std::move(task)]{ (*task)(); }, -priority);
return future;
};

View File

@ -66,7 +66,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;

View File

@ -80,7 +80,7 @@ Pipe StorageSystemRemoteDataPaths::read(
if (cache)
{
auto cache_paths = cache->tryGetCachePaths(cache->hash(object.getPathKeyForCache()));
auto cache_paths = cache->tryGetCachePaths(cache->createKeyForPath(object.getPathKeyForCache()));
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
}
else