mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
fix better
This commit is contained in:
parent
ae86f7ceaa
commit
d9684daa6f
@ -117,6 +117,9 @@ void LRUFileCache::useCell(
|
||||
{
|
||||
/// Move to the end of the queue. The iterator remains valid.
|
||||
queue.moveToEnd(*cell.queue_iterator, cache_lock);
|
||||
|
||||
if (auto query_context = getQueryContext(cell.file_segment->getQueryId(), cache_lock))
|
||||
query_context->use(cell.file_segment->key(), cell.file_segment->offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -485,7 +488,7 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
LRUFileCache::QueryContextPtr LRUFileCache::getCurrentQueryContext() const
|
||||
LRUFileCache::QueryContextPtr LRUFileCache::getCurrentQueryContext(std::lock_guard<std::mutex> &) const
|
||||
{
|
||||
if (!isQueryInitialized())
|
||||
return nullptr;
|
||||
@ -494,9 +497,15 @@ LRUFileCache::QueryContextPtr LRUFileCache::getCurrentQueryContext() const
|
||||
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
|
||||
}
|
||||
|
||||
LRUFileCache::QueryContextPtr LRUFileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> &) const
|
||||
{
|
||||
auto query_iter = query_map.find(query_id);
|
||||
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
|
||||
}
|
||||
|
||||
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto query_context = getCurrentQueryContext();
|
||||
auto query_context = getCurrentQueryContext(cache_lock);
|
||||
|
||||
if (query_context != nullptr)
|
||||
{
|
||||
@ -607,9 +616,10 @@ LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, si
|
||||
auto file_segment = cell->file_segment;
|
||||
if (file_segment)
|
||||
{
|
||||
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
|
||||
std::lock_guard segment_lock(file_segment->mutex);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -632,9 +642,10 @@ LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, si
|
||||
auto file_segment = cell->file_segment;
|
||||
if (file_segment)
|
||||
{
|
||||
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
|
||||
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -724,6 +735,15 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
auto file_segment = cell->file_segment;
|
||||
if (file_segment)
|
||||
{
|
||||
/// Update query_context if needed.
|
||||
{
|
||||
auto query_id = file_segment->getQueryId();
|
||||
if (!query_id.empty())
|
||||
{
|
||||
if (auto context = getQueryContext(query_id, cache_lock))
|
||||
context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
std::lock_guard segment_lock(file_segment->mutex);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
}
|
||||
@ -750,6 +770,16 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
auto file_segment = cell->file_segment;
|
||||
if (file_segment)
|
||||
{
|
||||
/// Update query context if needed.
|
||||
{
|
||||
auto query_id = file_segment->getQueryId();
|
||||
if (!query_id.empty())
|
||||
{
|
||||
if (auto context = getQueryContext(query_id, cache_lock))
|
||||
context->remove(file_segment->key(), file_segment->offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
}
|
||||
@ -760,6 +790,7 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
|
||||
if (query_context)
|
||||
query_context->reserve(key, offset, size, cache_lock);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -270,14 +270,19 @@ private:
|
||||
void remove(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
cache_size -= record->second->size;
|
||||
lru_queue.remove(record->second, cache_lock);
|
||||
records.erase({key, offset});
|
||||
|
||||
if (record != records.end())
|
||||
{
|
||||
cache_size -= record->second->size;
|
||||
lru_queue.remove(record->second, cache_lock);
|
||||
records.erase({key, offset});
|
||||
}
|
||||
}
|
||||
|
||||
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
|
||||
if (record == records.end())
|
||||
{
|
||||
auto queue_iter = lru_queue.add(key, offset, 0, cache_lock);
|
||||
@ -287,6 +292,14 @@ private:
|
||||
cache_size += size;
|
||||
}
|
||||
|
||||
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
|
||||
if (record != records.end())
|
||||
lru_queue.moveToEnd(record->second, cache_lock);
|
||||
}
|
||||
|
||||
size_t getRefCount() const { return ref_count; }
|
||||
|
||||
void incrementRefCount() { ref_count++; }
|
||||
@ -327,7 +340,9 @@ private:
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
QueryContextPtr getCurrentQueryContext() const;
|
||||
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
bool tryReserve(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
|
@ -162,6 +162,18 @@ String FileSegment::getDownloader() const
|
||||
return downloader_id;
|
||||
}
|
||||
|
||||
String FileSegment::getQueryId() const
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
||||
auto npos = downloader_id.find(":");
|
||||
|
||||
if (npos == std::string::npos)
|
||||
return {};
|
||||
else
|
||||
return downloader_id.substr(0, npos);
|
||||
}
|
||||
|
||||
bool FileSegment::isDownloader() const
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
@ -125,6 +125,8 @@ public:
|
||||
|
||||
String getDownloader() const;
|
||||
|
||||
String getQueryId() const;
|
||||
|
||||
void resetDownloader();
|
||||
|
||||
bool isDownloader() const;
|
||||
|
Loading…
Reference in New Issue
Block a user