diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 9bc3c1cf521..d74be9db386 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -242,7 +242,7 @@ FileSegments FileCache::splitRangeIntoCells( current_cell_size = std::min(remaining_size, max_file_segment_size); remaining_size -= current_cell_size; - auto cell_it = addCell(key, current_pos, current_cell_size, state, settings, key_transaction); + auto cell_it = addCell(key, current_pos, current_cell_size, state, settings, key_transaction, nullptr); file_segments.push_back(cell_it->second.file_segment); current_pos += current_cell_size; @@ -401,7 +401,7 @@ FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size if (settings.unbounded) { /// If the file is unbounded, we can create a single cell for it. - auto cell_it = addCell(key, offset, size, FileSegment::State::EMPTY, settings, *key_transaction); + auto cell_it = addCell(key, offset, size, FileSegment::State::EMPTY, settings, *key_transaction, nullptr); file_segments = {cell_it->second.file_segment}; } else @@ -469,7 +469,8 @@ FileCache::CacheCells::iterator FileCache::addCell( size_t size, FileSegment::State state, const CreateFileSegmentSettings & settings, - KeyTransaction & key_transaction) + KeyTransaction & key_transaction, + CachePriorityQueueGuard::Lock * queue_lock) { /// Create a file segment cell and put it in `files` map by [key][offset]. @@ -522,7 +523,7 @@ FileCache::CacheCells::iterator FileCache::addCell( auto file_segment = std::make_shared( offset, size, key, key_transaction.getCreator(), this, result_state, settings); - FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority); + FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority, queue_lock); auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell)); assert(inserted); @@ -532,34 +533,34 @@ FileCache::CacheCells::iterator FileCache::addCell( bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) { - auto main_priority_lock = main_priority->lockShared(); + auto queue_lock = main_priority->lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); - key_transaction->queue_lock = main_priority_lock; - return tryReserveUnlocked(key, offset, size, key_transaction); + return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock); } bool FileCache::tryReserveUnlocked( const Key & key, size_t offset, size_t size, - KeyTransactionPtr key_transaction) + KeyTransactionPtr key_transaction, + const CachePriorityQueueGuard::Lock & queue_lock) { - auto query_context = query_limit ? query_limit->tryGetQueryContext(key_transaction->getQueueLock()) : nullptr; + auto query_context = query_limit ? query_limit->tryGetQueryContext(queue_lock) : nullptr; bool reserved; if (query_context) { const bool query_limit_exceeded = query_context->getCacheSize() + size > query_context->getMaxCacheSize(); if (!query_limit_exceeded) - reserved = tryReserveInCache(key, offset, size, query_context, key_transaction); + reserved = tryReserveInCache(key, offset, size, query_context, key_transaction, queue_lock); else if (query_context->isSkipDownloadIfExceed()) reserved = false; else - reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction); + reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction, queue_lock); } else { - reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction); + reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction, queue_lock); } if (reserved && !key_transaction->getOffsets().created_base_directory) @@ -575,13 +576,13 @@ bool FileCache::tryReserveInQueryCache( size_t offset, size_t size, QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr key_transaction) + KeyTransactionPtr, + const CachePriorityQueueGuard::Lock & queue_lock) { LOG_TEST(log, "Reserving query cache space {} for {}:{}", size, key.toString(), offset); - const auto & lock = key_transaction->getQueueLock(); auto & query_priority = query_context->getPriority(); - if (query_priority.getElementsNum(lock)) {} + if (query_priority.getElementsNum(queue_lock)) {} // struct Segment // { // Key key; @@ -707,8 +708,8 @@ bool FileCache::tryReserveInQueryCache( void FileCache::iterateAndCollectKeyLocks( IFileCachePriority & priority, IterateAndCollectLocksFunc && func, - const CachePriorityQueueGuard::Lock & lock, - KeyTransactionsMap & locked_map) + KeyTransactionsMap & locked_map, + const CachePriorityQueueGuard::Lock & queue_lock) { priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry) { @@ -726,7 +727,7 @@ void FileCache::iterateAndCollectKeyLocks( locked_map.emplace(entry.key.key_prefix, current); return res.iteration_result; - }, lock); + }, queue_lock); } bool FileCache::tryReserveInCache( @@ -734,12 +735,12 @@ bool FileCache::tryReserveInCache( size_t offset, size_t size, QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr key_transaction) + KeyTransactionPtr key_transaction, + const CachePriorityQueueGuard::Lock & queue_lock) { LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset); - const auto & lock = key_transaction->queue_lock; - size_t queue_size = main_priority->getElementsNum(*lock); + size_t queue_size = main_priority->getElementsNum(queue_lock); chassert(queue_size <= max_element_size); auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset); @@ -753,7 +754,7 @@ bool FileCache::tryReserveInCache( { /// max_size == 0 means unlimited cache size, /// max_element_size means unlimited number of cache elements. - return (max_size != 0 && main_priority->getCacheSize(*lock) + size - removed_size > max_size) + return (max_size != 0 && main_priority->getCacheSize(queue_lock) + size - removed_size > max_size) || (max_element_size != 0 && queue_size > max_element_size); }; @@ -804,7 +805,7 @@ bool FileCache::tryReserveInCache( { remove_current_it = true; cell->queue_iterator = {}; - locked_key.remove(file_segment); + locked_key.remove(file_segment, queue_lock); break; } } @@ -817,7 +818,7 @@ bool FileCache::tryReserveInCache( return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction }; return { IterationResult::CONTINUE, save_key_transaction }; - }, *lock, locked); + }, locked, queue_lock); if (is_overflow()) return false; @@ -829,12 +830,12 @@ bool FileCache::tryReserveInCache( /// If queue iterator already exists, we need to update the size after each space reservation. auto queue_iterator = cell_for_reserve->queue_iterator; if (queue_iterator) - queue_iterator->incrementSize(size, *lock); + queue_iterator->incrementSize(size, queue_lock); else { /// Space reservation is incremental, so cache cell is created first (with state empty), /// and queue_iterator is assigned on first space reservation attempt. - cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), *lock); + cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), queue_lock); } } @@ -843,22 +844,22 @@ bool FileCache::tryReserveInCache( for (const auto & offset_to_delete : transaction->delete_offsets) { auto * cell = transaction->getOffsets().get(offset_to_delete); - transaction->queue_lock = lock; - transaction->remove(cell->file_segment); + transaction->remove(cell->file_segment, queue_lock); } } - if (main_priority->getCacheSize(*lock) > (1ull << 63)) + if (main_priority->getCacheSize(queue_lock) > (1ull << 63)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); if (query_context) - query_context->reserve(key, offset, size, *key_transaction); + query_context->reserve(key, offset, size, *key_transaction, queue_lock); return true; } void FileCache::removeIfExists(const Key & key) { + auto queue_lock = main_priority->lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); if (!key_transaction) return; @@ -885,7 +886,7 @@ void FileCache::removeIfExists(const Key & key) continue; } - key_transaction->remove(cell->file_segment); + key_transaction->remove(cell->file_segment, queue_lock); } } @@ -906,6 +907,7 @@ void FileCache::removeAllReleasable() /// `remove_persistent_files` defines whether non-evictable by some criteria files /// (they do not comply with the cache eviction policy) should also be removed. + auto queue_lock = main_priority->lock(); main_priority->iterate([&](const QueueEntry & entry) -> IterationResult { auto key_transaction = entry.createKeyTransaction(); @@ -914,11 +916,11 @@ void FileCache::removeAllReleasable() if (cell->releasable()) { cell->queue_iterator = {}; - key_transaction->remove(cell->file_segment); + key_transaction->remove(cell->file_segment, queue_lock); return IterationResult::REMOVE_AND_CONTINUE; } return IterationResult::CONTINUE; - }, main_priority->lock()); + }, queue_lock); if (stash) { @@ -929,10 +931,10 @@ void FileCache::removeAllReleasable() } } -void KeyTransaction::remove(FileSegmentPtr file_segment) +void KeyTransaction::remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock & queue_lock) { /// We must hold pointer to file segment while removing it. - remove(file_segment->key(), file_segment->offset(), file_segment->lock()); + remove(file_segment->key(), file_segment->offset(), file_segment->lock(), queue_lock); } bool KeyTransaction::isLastHolder(size_t offset) @@ -941,14 +943,11 @@ bool KeyTransaction::isLastHolder(size_t offset) return cell->file_segment.use_count() == 2; } -const CachePriorityQueueGuard::Lock & KeyTransaction::getQueueLock() const -{ - if (!queue_lock) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Queue is not locked"); - return *queue_lock; -} - -void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGuard::Lock & segment_lock) +void KeyTransaction::remove( + const Key & key, + size_t offset, + const FileSegmentGuard::Lock & segment_lock, + const CachePriorityQueueGuard::Lock & queue_lock) { LOG_DEBUG( log, "Remove from cache. Key: {}, offset: {}", @@ -957,7 +956,7 @@ void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGua auto * cell = offsets->get(offset); if (cell->queue_iterator) - cell->queue_iterator->remove(getQueueLock()); + cell->queue_iterator->remove(queue_lock); const auto cache_file_path = cell->file_segment->getPathInLocalCache(); cell->file_segment->detach(segment_lock, *this); @@ -988,7 +987,7 @@ void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGua void FileCache::loadCacheInfoIntoMemory() { - auto queue_lock = main_priority->lockShared(); + auto queue_lock = main_priority->lock(); UInt64 offset = 0; size_t size = 0; @@ -1068,14 +1067,13 @@ void FileCache::loadCacheInfoIntoMemory() } auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY, false); - key_transaction->queue_lock = queue_lock; key_transaction->getOffsets().created_base_directory = true; - if (tryReserveUnlocked(key, offset, size, key_transaction)) + if (tryReserveUnlocked(key, offset, size, key_transaction, queue_lock)) { auto cell_it = addCell( key, offset, size, FileSegment::State::DOWNLOADED, - CreateFileSegmentSettings(segment_kind), *key_transaction); + CreateFileSegmentSettings(segment_kind), *key_transaction, &queue_lock); queue_entries.emplace_back(cell_it->second.queue_iterator, cell_it->second.file_segment); } @@ -1104,11 +1102,15 @@ void FileCache::loadCacheInfoIntoMemory() if (file_segment.expired()) continue; - it->use(*queue_lock); + it->use(queue_lock); } } -void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock & segment_lock) +void KeyTransaction::reduceSizeToDownloaded( + const Key & key, + size_t offset, + const FileSegmentGuard::Lock & segment_lock, + const CachePriorityQueueGuard::Lock & queue_lock) { /** * In case file was partially downloaded and it's download cannot be continued @@ -1130,13 +1132,13 @@ void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, cons } assert(file_segment->downloaded_size <= file_segment->reserved_size); - assert(cell->queue_iterator->entry().size == file_segment->reserved_size); - assert(cell->queue_iterator->entry().size >= file_segment->downloaded_size); + assert((*cell->queue_iterator).size == file_segment->reserved_size); + assert((*cell->queue_iterator).size >= file_segment->downloaded_size); if (file_segment->reserved_size > file_segment->downloaded_size) { int64_t extra_size = static_cast(cell->file_segment->reserved_size) - static_cast(file_segment->downloaded_size); - cell->queue_iterator->incrementSize(-extra_size, getQueueLock()); + cell->queue_iterator->incrementSize(-extra_size, queue_lock); } CreateFileSegmentSettings create_settings(file_segment->getKind()); @@ -1145,7 +1147,7 @@ void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, cons FileSegment::State::DOWNLOADED, create_settings); assert(file_segment->reserved_size == downloaded_size); - assert(cell->size() == cell->queue_iterator->size()); + assert(cell->size() == (*cell->queue_iterator).size); } FileSegmentsHolderPtr FileCache::getSnapshot() @@ -1216,7 +1218,8 @@ size_t FileCache::getFileSegmentsNum() const FileCache::FileSegmentCell::FileSegmentCell( FileSegmentPtr file_segment_, KeyTransaction & key_transaction, - IFileCachePriority & priority_queue) + IFileCachePriority & priority_queue, + CachePriorityQueueGuard::Lock * queue_lock) : file_segment(file_segment_) { /** @@ -1229,9 +1232,15 @@ FileCache::FileSegmentCell::FileSegmentCell( { case FileSegment::State::DOWNLOADED: { + if (!queue_lock) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Adding file segment with state DOWNLOADED requires locked queue lock"); + } queue_iterator = priority_queue.add( file_segment->key(), file_segment->offset(), file_segment->range().size(), - key_transaction.getCreator(), key_transaction.getQueueLock()); + key_transaction.getCreator(), *queue_lock); /// TODO: add destructor break; @@ -1294,12 +1303,11 @@ std::string FileCache::CacheCells::toString() const return result; } -KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_, std::shared_ptr queue_lock_) +KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_) : guard(guard_) , lock(guard->lock()) , offsets(offsets_) , log(&Poco::Logger::get("KeyTransaction")) - , queue_lock(queue_lock_) { } @@ -1478,8 +1486,10 @@ void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, con query_map.erase(query_iter); } -FileCache::QueryLimit::QueryContextPtr -FileCache::QueryLimit::getOrSetQueryContext(const std::string & query_id, const ReadSettings & settings, const CachePriorityQueueGuard::Lock &) +FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext( + const std::string & query_id, + const ReadSettings & settings, + const CachePriorityQueueGuard::Lock &) { if (query_id.empty()) return nullptr; @@ -1504,7 +1514,11 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder( return std::make_unique(query_id, this, std::move(context)); } -void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction) +void FileCache::QueryLimit::QueryContext::remove( + const Key & key, + size_t offset, + size_t size, + const CachePriorityQueueGuard::Lock & queue_lock) { std::lock_guard lock(mutex); if (cache_size < size) @@ -1515,7 +1529,7 @@ void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset, auto record = records.find({key, offset}); if (record != records.end()) { - record->second->remove(key_transaction.getQueueLock()); + record->second->remove(queue_lock); records.erase({key, offset}); } } @@ -1523,7 +1537,11 @@ void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset, } void FileCache::QueryLimit::QueryContext::reserve( - const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction) + const Key & key, + size_t offset, + size_t size, + const KeyTransaction & key_transaction, + const CachePriorityQueueGuard::Lock & queue_lock) { std::lock_guard lock(mutex); if (cache_size + size > max_cache_size) @@ -1539,15 +1557,15 @@ void FileCache::QueryLimit::QueryContext::reserve( auto record = records.find({key, offset}); if (record == records.end()) { - auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), key_transaction.getQueueLock()); + auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), queue_lock); record = records.insert({{key, offset}, queue_iter}).first; } - record->second->incrementSize(size, key_transaction.getQueueLock()); + record->second->incrementSize(size, queue_lock); } cache_size += size; } -void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, KeyTransaction & key_transaction) +void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock & queue_lock) { if (skip_download_if_exceeds_query_cache) return; @@ -1555,7 +1573,7 @@ void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, Ke std::lock_guard lock(mutex); auto record = records.find({key, offset}); if (record != records.end()) - record->second->use(key_transaction.getQueueLock()); + record->second->use(queue_lock); } KeyTransactionPtr KeyTransactionCreator::create() diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 96f6046e43c..8ae22fbcca7 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -144,7 +144,8 @@ private: FileSegmentCell( FileSegmentPtr file_segment_, KeyTransaction & key_transaction, - IFileCachePriority & priority_queue); + IFileCachePriority & priority_queue, + CachePriorityQueueGuard::Lock * queue_lock); FileSegmentCell(FileSegmentCell && other) noexcept : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} @@ -242,11 +243,11 @@ private: bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } - void remove(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction); + void remove(const Key & key, size_t offset, size_t size, const CachePriorityQueueGuard::Lock &); - void reserve(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction); + void reserve(const Key & key, size_t offset, size_t size, const KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &); - void use(const Key & key, size_t offset, KeyTransaction & key_transaction); + void use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &); }; using QueryContextPtr = std::shared_ptr; @@ -315,27 +316,31 @@ private: size_t size, FileSegment::State state, const CreateFileSegmentSettings & create_settings, - KeyTransaction & key_transaction); + KeyTransaction & key_transaction, + CachePriorityQueueGuard::Lock * queue_lock); bool tryReserveUnlocked( const Key & key, size_t offset, size_t size, - KeyTransactionPtr key_transaction); + KeyTransactionPtr key_transaction, + const CachePriorityQueueGuard::Lock &); bool tryReserveInCache( const Key & key, size_t offset, size_t size, QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr key_transaction); + KeyTransactionPtr key_transaction, + const CachePriorityQueueGuard::Lock &); bool tryReserveInQueryCache( const Key & key, size_t offset, size_t size, QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr key_transaction); + KeyTransactionPtr key_transaction, + const CachePriorityQueueGuard::Lock &); void removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock & lock) const; @@ -348,8 +353,8 @@ private: static void iterateAndCollectKeyLocks( IFileCachePriority & priority, IterateAndCollectLocksFunc && func, - const CachePriorityQueueGuard::Lock & lock, - KeyTransactionsMap & locked_map); + KeyTransactionsMap & locked_map, + const CachePriorityQueueGuard::Lock & queue_lock); }; struct KeyTransactionCreator @@ -369,15 +374,15 @@ struct KeyTransaction : private boost::noncopyable { using Key = FileCacheKey; - KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_, std::shared_ptr queue_lock_ = nullptr); + KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_); - KeyTransactionCreatorPtr getCreator() { return std::make_unique(guard, offsets); } + KeyTransactionCreatorPtr getCreator() const { return std::make_unique(guard, offsets); } - void remove(FileSegmentPtr file_segment); + void reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); - void reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock &); + void remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock &); - void remove(const Key & key, size_t offset, const FileSegmentGuard::Lock &); + void remove(const Key & key, size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); bool isLastHolder(size_t offset); @@ -386,8 +391,6 @@ struct KeyTransaction : private boost::noncopyable std::vector delete_offsets; - const CachePriorityQueueGuard::Lock & getQueueLock() const; - private: KeyPrefixGuardPtr guard; const KeyPrefixGuard::Lock lock; @@ -395,9 +398,6 @@ private: FileCache::CacheCellsPtr offsets; Poco::Logger * log; -public: - std::shared_ptr queue_lock; - }; } diff --git a/src/Interpreters/Cache/FileCacheKey.h b/src/Interpreters/Cache/FileCacheKey.h index a8a2dd8b3ac..981c37d32a5 100644 --- a/src/Interpreters/Cache/FileCacheKey.h +++ b/src/Interpreters/Cache/FileCacheKey.h @@ -15,7 +15,7 @@ struct FileCacheKey explicit FileCacheKey(const std::string & path); - explicit FileCacheKey(const UInt128 & path); + explicit FileCacheKey(const UInt128 & key_); static FileCacheKey random(); diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index e859f566c35..40a62d7fd15 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -536,13 +536,12 @@ void FileSegment::setBroken() void FileSegment::complete() { - auto lock = cache->main_priority->lockShared(); + auto queue_lock = cache->main_priority->lock(); auto key_transaction = createKeyTransaction(); - key_transaction->queue_lock = lock; - return completeUnlocked(*key_transaction); + return completeUnlocked(*key_transaction, queue_lock); } -void FileSegment::completeUnlocked(KeyTransaction & key_transaction) +void FileSegment::completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock & queue_lock) { auto segment_lock = segment_guard.lock(); @@ -589,7 +588,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction) LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock)); detach(segment_lock, key_transaction); setDownloadState(State::SKIP_CACHE, segment_lock); - key_transaction.remove(key(), offset(), segment_lock); + key_transaction.remove(key(), offset(), segment_lock, queue_lock); return; } @@ -621,7 +620,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction) LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString()); setDownloadState(State::SKIP_CACHE, segment_lock); - key_transaction.remove(key(), offset(), segment_lock); + key_transaction.remove(key(), offset(), segment_lock, queue_lock); } else { @@ -639,7 +638,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction) /// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state, /// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken /// (this will be crucial for other file segment holder, not for current one). - key_transaction.reduceSizeToDownloaded(key(), offset(), segment_lock); + key_transaction.reduceSizeToDownloaded(key(), offset(), segment_lock, queue_lock); } detachAssumeStateFinalized(segment_lock); @@ -818,7 +817,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() return file_segments.erase(file_segments.begin()); } - auto lock = file_segment.cache->main_priority->lockShared(); + auto queue_lock = file_segment.cache->main_priority->lock(); /// File segment pointer must be reset right after calling complete() and /// under the same mutex, because complete() checks for segment pointers. auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false); @@ -826,12 +825,11 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() { auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator; if (queue_iter) - queue_iter->use(*lock); + queue_iter->use(queue_lock); if (!file_segment.isCompleted()) { - key_transaction->queue_lock = lock; - file_segment.completeUnlocked(*key_transaction); + file_segment.completeUnlocked(*key_transaction, queue_lock); } } diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 4df28de524c..13fd1f3d86a 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -294,7 +294,7 @@ private: /// Function might check if the caller of the method /// is the last alive holder of the segment. Therefore, completion and destruction /// of the file segment pointer must be done under the same cache mutex. - void completeUnlocked(KeyTransaction & key_transaction); + void completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &); void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index f9f7cc67949..ccef858eb4d 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -73,7 +73,6 @@ public: /// Lock current priority queue. All methods must be called under this lock. CachePriorityQueueGuard::Lock lock() { return guard.lock(); } - std::shared_ptr lockShared() { return guard.lockShared(); } /// Add a cache record that did not exist before, and throw a /// logical exception if the cache block already exists.