Pass queue lock implicitly

This commit is contained in:
kssenii 2023-01-06 21:36:41 +01:00
parent 3f558f09eb
commit 539f4fde1c
6 changed files with 116 additions and 101 deletions

View File

@ -242,7 +242,7 @@ FileSegments FileCache::splitRangeIntoCells(
current_cell_size = std::min(remaining_size, max_file_segment_size); current_cell_size = std::min(remaining_size, max_file_segment_size);
remaining_size -= current_cell_size; remaining_size -= current_cell_size;
auto cell_it = addCell(key, current_pos, current_cell_size, state, settings, key_transaction); auto cell_it = addCell(key, current_pos, current_cell_size, state, settings, key_transaction, nullptr);
file_segments.push_back(cell_it->second.file_segment); file_segments.push_back(cell_it->second.file_segment);
current_pos += current_cell_size; current_pos += current_cell_size;
@ -401,7 +401,7 @@ FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size
if (settings.unbounded) if (settings.unbounded)
{ {
/// If the file is unbounded, we can create a single cell for it. /// If the file is unbounded, we can create a single cell for it.
auto cell_it = addCell(key, offset, size, FileSegment::State::EMPTY, settings, *key_transaction); auto cell_it = addCell(key, offset, size, FileSegment::State::EMPTY, settings, *key_transaction, nullptr);
file_segments = {cell_it->second.file_segment}; file_segments = {cell_it->second.file_segment};
} }
else else
@ -469,7 +469,8 @@ FileCache::CacheCells::iterator FileCache::addCell(
size_t size, size_t size,
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & settings, const CreateFileSegmentSettings & settings,
KeyTransaction & key_transaction) KeyTransaction & key_transaction,
CachePriorityQueueGuard::Lock * queue_lock)
{ {
/// Create a file segment cell and put it in `files` map by [key][offset]. /// Create a file segment cell and put it in `files` map by [key][offset].
@ -522,7 +523,7 @@ FileCache::CacheCells::iterator FileCache::addCell(
auto file_segment = std::make_shared<FileSegment>( auto file_segment = std::make_shared<FileSegment>(
offset, size, key, key_transaction.getCreator(), this, result_state, settings); offset, size, key, key_transaction.getCreator(), this, result_state, settings);
FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority); FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority, queue_lock);
auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell)); auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell));
assert(inserted); assert(inserted);
@ -532,34 +533,34 @@ FileCache::CacheCells::iterator FileCache::addCell(
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) bool FileCache::tryReserve(const Key & key, size_t offset, size_t size)
{ {
auto main_priority_lock = main_priority->lockShared(); auto queue_lock = main_priority->lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW);
key_transaction->queue_lock = main_priority_lock; return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock);
return tryReserveUnlocked(key, offset, size, key_transaction);
} }
bool FileCache::tryReserveUnlocked( bool FileCache::tryReserveUnlocked(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction) KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
auto query_context = query_limit ? query_limit->tryGetQueryContext(key_transaction->getQueueLock()) : nullptr; auto query_context = query_limit ? query_limit->tryGetQueryContext(queue_lock) : nullptr;
bool reserved; bool reserved;
if (query_context) if (query_context)
{ {
const bool query_limit_exceeded = query_context->getCacheSize() + size > query_context->getMaxCacheSize(); const bool query_limit_exceeded = query_context->getCacheSize() + size > query_context->getMaxCacheSize();
if (!query_limit_exceeded) if (!query_limit_exceeded)
reserved = tryReserveInCache(key, offset, size, query_context, key_transaction); reserved = tryReserveInCache(key, offset, size, query_context, key_transaction, queue_lock);
else if (query_context->isSkipDownloadIfExceed()) else if (query_context->isSkipDownloadIfExceed())
reserved = false; reserved = false;
else else
reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction); reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction, queue_lock);
} }
else else
{ {
reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction); reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction, queue_lock);
} }
if (reserved && !key_transaction->getOffsets().created_base_directory) if (reserved && !key_transaction->getOffsets().created_base_directory)
@ -575,13 +576,13 @@ bool FileCache::tryReserveInQueryCache(
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context, QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction) KeyTransactionPtr,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
LOG_TEST(log, "Reserving query cache space {} for {}:{}", size, key.toString(), offset); LOG_TEST(log, "Reserving query cache space {} for {}:{}", size, key.toString(), offset);
const auto & lock = key_transaction->getQueueLock();
auto & query_priority = query_context->getPriority(); auto & query_priority = query_context->getPriority();
if (query_priority.getElementsNum(lock)) {} if (query_priority.getElementsNum(queue_lock)) {}
// struct Segment // struct Segment
// { // {
// Key key; // Key key;
@ -707,8 +708,8 @@ bool FileCache::tryReserveInQueryCache(
void FileCache::iterateAndCollectKeyLocks( void FileCache::iterateAndCollectKeyLocks(
IFileCachePriority & priority, IFileCachePriority & priority,
IterateAndCollectLocksFunc && func, IterateAndCollectLocksFunc && func,
const CachePriorityQueueGuard::Lock & lock, KeyTransactionsMap & locked_map,
KeyTransactionsMap & locked_map) const CachePriorityQueueGuard::Lock & queue_lock)
{ {
priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry) priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry)
{ {
@ -726,7 +727,7 @@ void FileCache::iterateAndCollectKeyLocks(
locked_map.emplace(entry.key.key_prefix, current); locked_map.emplace(entry.key.key_prefix, current);
return res.iteration_result; return res.iteration_result;
}, lock); }, queue_lock);
} }
bool FileCache::tryReserveInCache( bool FileCache::tryReserveInCache(
@ -734,12 +735,12 @@ bool FileCache::tryReserveInCache(
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context, QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction) KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset); LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset);
const auto & lock = key_transaction->queue_lock;
size_t queue_size = main_priority->getElementsNum(*lock); size_t queue_size = main_priority->getElementsNum(queue_lock);
chassert(queue_size <= max_element_size); chassert(queue_size <= max_element_size);
auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset); auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset);
@ -753,7 +754,7 @@ bool FileCache::tryReserveInCache(
{ {
/// max_size == 0 means unlimited cache size, /// max_size == 0 means unlimited cache size,
/// max_element_size means unlimited number of cache elements. /// max_element_size means unlimited number of cache elements.
return (max_size != 0 && main_priority->getCacheSize(*lock) + size - removed_size > max_size) return (max_size != 0 && main_priority->getCacheSize(queue_lock) + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size); || (max_element_size != 0 && queue_size > max_element_size);
}; };
@ -804,7 +805,7 @@ bool FileCache::tryReserveInCache(
{ {
remove_current_it = true; remove_current_it = true;
cell->queue_iterator = {}; cell->queue_iterator = {};
locked_key.remove(file_segment); locked_key.remove(file_segment, queue_lock);
break; break;
} }
} }
@ -817,7 +818,7 @@ bool FileCache::tryReserveInCache(
return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction }; return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction };
return { IterationResult::CONTINUE, save_key_transaction }; return { IterationResult::CONTINUE, save_key_transaction };
}, *lock, locked); }, locked, queue_lock);
if (is_overflow()) if (is_overflow())
return false; return false;
@ -829,12 +830,12 @@ bool FileCache::tryReserveInCache(
/// If queue iterator already exists, we need to update the size after each space reservation. /// If queue iterator already exists, we need to update the size after each space reservation.
auto queue_iterator = cell_for_reserve->queue_iterator; auto queue_iterator = cell_for_reserve->queue_iterator;
if (queue_iterator) if (queue_iterator)
queue_iterator->incrementSize(size, *lock); queue_iterator->incrementSize(size, queue_lock);
else else
{ {
/// Space reservation is incremental, so cache cell is created first (with state empty), /// Space reservation is incremental, so cache cell is created first (with state empty),
/// and queue_iterator is assigned on first space reservation attempt. /// and queue_iterator is assigned on first space reservation attempt.
cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), *lock); cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), queue_lock);
} }
} }
@ -843,22 +844,22 @@ bool FileCache::tryReserveInCache(
for (const auto & offset_to_delete : transaction->delete_offsets) for (const auto & offset_to_delete : transaction->delete_offsets)
{ {
auto * cell = transaction->getOffsets().get(offset_to_delete); auto * cell = transaction->getOffsets().get(offset_to_delete);
transaction->queue_lock = lock; transaction->remove(cell->file_segment, queue_lock);
transaction->remove(cell->file_segment);
} }
} }
if (main_priority->getCacheSize(*lock) > (1ull << 63)) if (main_priority->getCacheSize(queue_lock) > (1ull << 63))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
if (query_context) if (query_context)
query_context->reserve(key, offset, size, *key_transaction); query_context->reserve(key, offset, size, *key_transaction, queue_lock);
return true; return true;
} }
void FileCache::removeIfExists(const Key & key) void FileCache::removeIfExists(const Key & key)
{ {
auto queue_lock = main_priority->lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL);
if (!key_transaction) if (!key_transaction)
return; return;
@ -885,7 +886,7 @@ void FileCache::removeIfExists(const Key & key)
continue; continue;
} }
key_transaction->remove(cell->file_segment); key_transaction->remove(cell->file_segment, queue_lock);
} }
} }
@ -906,6 +907,7 @@ void FileCache::removeAllReleasable()
/// `remove_persistent_files` defines whether non-evictable by some criteria files /// `remove_persistent_files` defines whether non-evictable by some criteria files
/// (they do not comply with the cache eviction policy) should also be removed. /// (they do not comply with the cache eviction policy) should also be removed.
auto queue_lock = main_priority->lock();
main_priority->iterate([&](const QueueEntry & entry) -> IterationResult main_priority->iterate([&](const QueueEntry & entry) -> IterationResult
{ {
auto key_transaction = entry.createKeyTransaction(); auto key_transaction = entry.createKeyTransaction();
@ -914,11 +916,11 @@ void FileCache::removeAllReleasable()
if (cell->releasable()) if (cell->releasable())
{ {
cell->queue_iterator = {}; cell->queue_iterator = {};
key_transaction->remove(cell->file_segment); key_transaction->remove(cell->file_segment, queue_lock);
return IterationResult::REMOVE_AND_CONTINUE; return IterationResult::REMOVE_AND_CONTINUE;
} }
return IterationResult::CONTINUE; return IterationResult::CONTINUE;
}, main_priority->lock()); }, queue_lock);
if (stash) if (stash)
{ {
@ -929,10 +931,10 @@ void FileCache::removeAllReleasable()
} }
} }
void KeyTransaction::remove(FileSegmentPtr file_segment) void KeyTransaction::remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock & queue_lock)
{ {
/// We must hold pointer to file segment while removing it. /// We must hold pointer to file segment while removing it.
remove(file_segment->key(), file_segment->offset(), file_segment->lock()); remove(file_segment->key(), file_segment->offset(), file_segment->lock(), queue_lock);
} }
bool KeyTransaction::isLastHolder(size_t offset) bool KeyTransaction::isLastHolder(size_t offset)
@ -941,14 +943,11 @@ bool KeyTransaction::isLastHolder(size_t offset)
return cell->file_segment.use_count() == 2; return cell->file_segment.use_count() == 2;
} }
const CachePriorityQueueGuard::Lock & KeyTransaction::getQueueLock() const void KeyTransaction::remove(
{ const Key & key,
if (!queue_lock) size_t offset,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Queue is not locked"); const FileSegmentGuard::Lock & segment_lock,
return *queue_lock; const CachePriorityQueueGuard::Lock & queue_lock)
}
void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGuard::Lock & segment_lock)
{ {
LOG_DEBUG( LOG_DEBUG(
log, "Remove from cache. Key: {}, offset: {}", log, "Remove from cache. Key: {}, offset: {}",
@ -957,7 +956,7 @@ void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGua
auto * cell = offsets->get(offset); auto * cell = offsets->get(offset);
if (cell->queue_iterator) if (cell->queue_iterator)
cell->queue_iterator->remove(getQueueLock()); cell->queue_iterator->remove(queue_lock);
const auto cache_file_path = cell->file_segment->getPathInLocalCache(); const auto cache_file_path = cell->file_segment->getPathInLocalCache();
cell->file_segment->detach(segment_lock, *this); cell->file_segment->detach(segment_lock, *this);
@ -988,7 +987,7 @@ void KeyTransaction::remove(const Key & key, size_t offset, const FileSegmentGua
void FileCache::loadCacheInfoIntoMemory() void FileCache::loadCacheInfoIntoMemory()
{ {
auto queue_lock = main_priority->lockShared(); auto queue_lock = main_priority->lock();
UInt64 offset = 0; UInt64 offset = 0;
size_t size = 0; size_t size = 0;
@ -1068,14 +1067,13 @@ void FileCache::loadCacheInfoIntoMemory()
} }
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY, false); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY, false);
key_transaction->queue_lock = queue_lock;
key_transaction->getOffsets().created_base_directory = true; key_transaction->getOffsets().created_base_directory = true;
if (tryReserveUnlocked(key, offset, size, key_transaction)) if (tryReserveUnlocked(key, offset, size, key_transaction, queue_lock))
{ {
auto cell_it = addCell( auto cell_it = addCell(
key, offset, size, FileSegment::State::DOWNLOADED, key, offset, size, FileSegment::State::DOWNLOADED,
CreateFileSegmentSettings(segment_kind), *key_transaction); CreateFileSegmentSettings(segment_kind), *key_transaction, &queue_lock);
queue_entries.emplace_back(cell_it->second.queue_iterator, cell_it->second.file_segment); queue_entries.emplace_back(cell_it->second.queue_iterator, cell_it->second.file_segment);
} }
@ -1104,11 +1102,15 @@ void FileCache::loadCacheInfoIntoMemory()
if (file_segment.expired()) if (file_segment.expired())
continue; continue;
it->use(*queue_lock); it->use(queue_lock);
} }
} }
void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock & segment_lock) void KeyTransaction::reduceSizeToDownloaded(
const Key & key,
size_t offset,
const FileSegmentGuard::Lock & segment_lock,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
/** /**
* In case file was partially downloaded and it's download cannot be continued * In case file was partially downloaded and it's download cannot be continued
@ -1130,13 +1132,13 @@ void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, cons
} }
assert(file_segment->downloaded_size <= file_segment->reserved_size); assert(file_segment->downloaded_size <= file_segment->reserved_size);
assert(cell->queue_iterator->entry().size == file_segment->reserved_size); assert((*cell->queue_iterator).size == file_segment->reserved_size);
assert(cell->queue_iterator->entry().size >= file_segment->downloaded_size); assert((*cell->queue_iterator).size >= file_segment->downloaded_size);
if (file_segment->reserved_size > file_segment->downloaded_size) if (file_segment->reserved_size > file_segment->downloaded_size)
{ {
int64_t extra_size = static_cast<ssize_t>(cell->file_segment->reserved_size) - static_cast<ssize_t>(file_segment->downloaded_size); int64_t extra_size = static_cast<ssize_t>(cell->file_segment->reserved_size) - static_cast<ssize_t>(file_segment->downloaded_size);
cell->queue_iterator->incrementSize(-extra_size, getQueueLock()); cell->queue_iterator->incrementSize(-extra_size, queue_lock);
} }
CreateFileSegmentSettings create_settings(file_segment->getKind()); CreateFileSegmentSettings create_settings(file_segment->getKind());
@ -1145,7 +1147,7 @@ void KeyTransaction::reduceSizeToDownloaded(const Key & key, size_t offset, cons
FileSegment::State::DOWNLOADED, create_settings); FileSegment::State::DOWNLOADED, create_settings);
assert(file_segment->reserved_size == downloaded_size); assert(file_segment->reserved_size == downloaded_size);
assert(cell->size() == cell->queue_iterator->size()); assert(cell->size() == (*cell->queue_iterator).size);
} }
FileSegmentsHolderPtr FileCache::getSnapshot() FileSegmentsHolderPtr FileCache::getSnapshot()
@ -1216,7 +1218,8 @@ size_t FileCache::getFileSegmentsNum() const
FileCache::FileSegmentCell::FileSegmentCell( FileCache::FileSegmentCell::FileSegmentCell(
FileSegmentPtr file_segment_, FileSegmentPtr file_segment_,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
IFileCachePriority & priority_queue) IFileCachePriority & priority_queue,
CachePriorityQueueGuard::Lock * queue_lock)
: file_segment(file_segment_) : file_segment(file_segment_)
{ {
/** /**
@ -1229,9 +1232,15 @@ FileCache::FileSegmentCell::FileSegmentCell(
{ {
case FileSegment::State::DOWNLOADED: case FileSegment::State::DOWNLOADED:
{ {
if (!queue_lock)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding file segment with state DOWNLOADED requires locked queue lock");
}
queue_iterator = priority_queue.add( queue_iterator = priority_queue.add(
file_segment->key(), file_segment->offset(), file_segment->range().size(), file_segment->key(), file_segment->offset(), file_segment->range().size(),
key_transaction.getCreator(), key_transaction.getQueueLock()); key_transaction.getCreator(), *queue_lock);
/// TODO: add destructor /// TODO: add destructor
break; break;
@ -1294,12 +1303,11 @@ std::string FileCache::CacheCells::toString() const
return result; return result;
} }
KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_, std::shared_ptr<CachePriorityQueueGuard::Lock> queue_lock_) KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
: guard(guard_) : guard(guard_)
, lock(guard->lock()) , lock(guard->lock())
, offsets(offsets_) , offsets(offsets_)
, log(&Poco::Logger::get("KeyTransaction")) , log(&Poco::Logger::get("KeyTransaction"))
, queue_lock(queue_lock_)
{ {
} }
@ -1478,8 +1486,10 @@ void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, con
query_map.erase(query_iter); query_map.erase(query_iter);
} }
FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext(
FileCache::QueryLimit::getOrSetQueryContext(const std::string & query_id, const ReadSettings & settings, const CachePriorityQueueGuard::Lock &) const std::string & query_id,
const ReadSettings & settings,
const CachePriorityQueueGuard::Lock &)
{ {
if (query_id.empty()) if (query_id.empty())
return nullptr; return nullptr;
@ -1504,7 +1514,11 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context)); return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
} }
void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction) void FileCache::QueryLimit::QueryContext::remove(
const Key & key,
size_t offset,
size_t size,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (cache_size < size) if (cache_size < size)
@ -1515,7 +1529,7 @@ void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset,
auto record = records.find({key, offset}); auto record = records.find({key, offset});
if (record != records.end()) if (record != records.end())
{ {
record->second->remove(key_transaction.getQueueLock()); record->second->remove(queue_lock);
records.erase({key, offset}); records.erase({key, offset});
} }
} }
@ -1523,7 +1537,11 @@ void FileCache::QueryLimit::QueryContext::remove(const Key & key, size_t offset,
} }
void FileCache::QueryLimit::QueryContext::reserve( void FileCache::QueryLimit::QueryContext::reserve(
const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction) const Key & key,
size_t offset,
size_t size,
const KeyTransaction & key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (cache_size + size > max_cache_size) if (cache_size + size > max_cache_size)
@ -1539,15 +1557,15 @@ void FileCache::QueryLimit::QueryContext::reserve(
auto record = records.find({key, offset}); auto record = records.find({key, offset});
if (record == records.end()) if (record == records.end())
{ {
auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), key_transaction.getQueueLock()); auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), queue_lock);
record = records.insert({{key, offset}, queue_iter}).first; record = records.insert({{key, offset}, queue_iter}).first;
} }
record->second->incrementSize(size, key_transaction.getQueueLock()); record->second->incrementSize(size, queue_lock);
} }
cache_size += size; cache_size += size;
} }
void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, KeyTransaction & key_transaction) void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock & queue_lock)
{ {
if (skip_download_if_exceeds_query_cache) if (skip_download_if_exceeds_query_cache)
return; return;
@ -1555,7 +1573,7 @@ void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, Ke
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto record = records.find({key, offset}); auto record = records.find({key, offset});
if (record != records.end()) if (record != records.end())
record->second->use(key_transaction.getQueueLock()); record->second->use(queue_lock);
} }
KeyTransactionPtr KeyTransactionCreator::create() KeyTransactionPtr KeyTransactionCreator::create()

View File

@ -144,7 +144,8 @@ private:
FileSegmentCell( FileSegmentCell(
FileSegmentPtr file_segment_, FileSegmentPtr file_segment_,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
IFileCachePriority & priority_queue); IFileCachePriority & priority_queue,
CachePriorityQueueGuard::Lock * queue_lock);
FileSegmentCell(FileSegmentCell && other) noexcept FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {}
@ -242,11 +243,11 @@ private:
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
void remove(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction); void remove(const Key & key, size_t offset, size_t size, const CachePriorityQueueGuard::Lock &);
void reserve(const Key & key, size_t offset, size_t size, KeyTransaction & key_transaction); void reserve(const Key & key, size_t offset, size_t size, const KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &);
void use(const Key & key, size_t offset, KeyTransaction & key_transaction); void use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &);
}; };
using QueryContextPtr = std::shared_ptr<QueryContext>; using QueryContextPtr = std::shared_ptr<QueryContext>;
@ -315,27 +316,31 @@ private:
size_t size, size_t size,
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & create_settings, const CreateFileSegmentSettings & create_settings,
KeyTransaction & key_transaction); KeyTransaction & key_transaction,
CachePriorityQueueGuard::Lock * queue_lock);
bool tryReserveUnlocked( bool tryReserveUnlocked(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction); KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &);
bool tryReserveInCache( bool tryReserveInCache(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context, QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction); KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &);
bool tryReserveInQueryCache( bool tryReserveInQueryCache(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context, QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction); KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &);
void removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock & lock) const; void removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock & lock) const;
@ -348,8 +353,8 @@ private:
static void iterateAndCollectKeyLocks( static void iterateAndCollectKeyLocks(
IFileCachePriority & priority, IFileCachePriority & priority,
IterateAndCollectLocksFunc && func, IterateAndCollectLocksFunc && func,
const CachePriorityQueueGuard::Lock & lock, KeyTransactionsMap & locked_map,
KeyTransactionsMap & locked_map); const CachePriorityQueueGuard::Lock & queue_lock);
}; };
struct KeyTransactionCreator struct KeyTransactionCreator
@ -369,15 +374,15 @@ struct KeyTransaction : private boost::noncopyable
{ {
using Key = FileCacheKey; using Key = FileCacheKey;
KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_, std::shared_ptr<CachePriorityQueueGuard::Lock> queue_lock_ = nullptr); KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_);
KeyTransactionCreatorPtr getCreator() { return std::make_unique<KeyTransactionCreator>(guard, offsets); } KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(guard, offsets); }
void remove(FileSegmentPtr file_segment); void reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &);
void reduceSizeToDownloaded(const Key & key, size_t offset, const FileSegmentGuard::Lock &); void remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock &);
void remove(const Key & key, size_t offset, const FileSegmentGuard::Lock &); void remove(const Key & key, size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &);
bool isLastHolder(size_t offset); bool isLastHolder(size_t offset);
@ -386,8 +391,6 @@ struct KeyTransaction : private boost::noncopyable
std::vector<size_t> delete_offsets; std::vector<size_t> delete_offsets;
const CachePriorityQueueGuard::Lock & getQueueLock() const;
private: private:
KeyPrefixGuardPtr guard; KeyPrefixGuardPtr guard;
const KeyPrefixGuard::Lock lock; const KeyPrefixGuard::Lock lock;
@ -395,9 +398,6 @@ private:
FileCache::CacheCellsPtr offsets; FileCache::CacheCellsPtr offsets;
Poco::Logger * log; Poco::Logger * log;
public:
std::shared_ptr<CachePriorityQueueGuard::Lock> queue_lock;
}; };
} }

View File

@ -15,7 +15,7 @@ struct FileCacheKey
explicit FileCacheKey(const std::string & path); explicit FileCacheKey(const std::string & path);
explicit FileCacheKey(const UInt128 & path); explicit FileCacheKey(const UInt128 & key_);
static FileCacheKey random(); static FileCacheKey random();

View File

@ -536,13 +536,12 @@ void FileSegment::setBroken()
void FileSegment::complete() void FileSegment::complete()
{ {
auto lock = cache->main_priority->lockShared(); auto queue_lock = cache->main_priority->lock();
auto key_transaction = createKeyTransaction(); auto key_transaction = createKeyTransaction();
key_transaction->queue_lock = lock; return completeUnlocked(*key_transaction, queue_lock);
return completeUnlocked(*key_transaction);
} }
void FileSegment::completeUnlocked(KeyTransaction & key_transaction) void FileSegment::completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock & queue_lock)
{ {
auto segment_lock = segment_guard.lock(); auto segment_lock = segment_guard.lock();
@ -589,7 +588,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction)
LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock)); LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock));
detach(segment_lock, key_transaction); detach(segment_lock, key_transaction);
setDownloadState(State::SKIP_CACHE, segment_lock); setDownloadState(State::SKIP_CACHE, segment_lock);
key_transaction.remove(key(), offset(), segment_lock); key_transaction.remove(key(), offset(), segment_lock, queue_lock);
return; return;
} }
@ -621,7 +620,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction)
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString()); LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
setDownloadState(State::SKIP_CACHE, segment_lock); setDownloadState(State::SKIP_CACHE, segment_lock);
key_transaction.remove(key(), offset(), segment_lock); key_transaction.remove(key(), offset(), segment_lock, queue_lock);
} }
else else
{ {
@ -639,7 +638,7 @@ void FileSegment::completeUnlocked(KeyTransaction & key_transaction)
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state, /// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken /// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one). /// (this will be crucial for other file segment holder, not for current one).
key_transaction.reduceSizeToDownloaded(key(), offset(), segment_lock); key_transaction.reduceSizeToDownloaded(key(), offset(), segment_lock, queue_lock);
} }
detachAssumeStateFinalized(segment_lock); detachAssumeStateFinalized(segment_lock);
@ -818,7 +817,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
return file_segments.erase(file_segments.begin()); return file_segments.erase(file_segments.begin());
} }
auto lock = file_segment.cache->main_priority->lockShared(); auto queue_lock = file_segment.cache->main_priority->lock();
/// File segment pointer must be reset right after calling complete() and /// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers. /// under the same mutex, because complete() checks for segment pointers.
auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false); auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false);
@ -826,12 +825,11 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
{ {
auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator; auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator;
if (queue_iter) if (queue_iter)
queue_iter->use(*lock); queue_iter->use(queue_lock);
if (!file_segment.isCompleted()) if (!file_segment.isCompleted())
{ {
key_transaction->queue_lock = lock; file_segment.completeUnlocked(*key_transaction, queue_lock);
file_segment.completeUnlocked(*key_transaction);
} }
} }

View File

@ -294,7 +294,7 @@ private:
/// Function might check if the caller of the method /// Function might check if the caller of the method
/// is the last alive holder of the segment. Therefore, completion and destruction /// is the last alive holder of the segment. Therefore, completion and destruction
/// of the file segment pointer must be done under the same cache mutex. /// of the file segment pointer must be done under the same cache mutex.
void completeUnlocked(KeyTransaction & key_transaction); void completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &);
void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock);
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;

View File

@ -73,7 +73,6 @@ public:
/// Lock current priority queue. All methods must be called under this lock. /// Lock current priority queue. All methods must be called under this lock.
CachePriorityQueueGuard::Lock lock() { return guard.lock(); } CachePriorityQueueGuard::Lock lock() { return guard.lock(); }
std::shared_ptr<CachePriorityQueueGuard::Lock> lockShared() { return guard.lockShared(); }
/// Add a cache record that did not exist before, and throw a /// Add a cache record that did not exist before, and throw a
/// logical exception if the cache block already exists. /// logical exception if the cache block already exists.