mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge branch 'master' into xuelei_dev
This commit is contained in:
commit
c47b797515
@ -333,11 +333,11 @@ class DownloadQueue
|
|||||||
{
|
{
|
||||||
friend struct CacheMetadata;
|
friend struct CacheMetadata;
|
||||||
public:
|
public:
|
||||||
void add(std::weak_ptr<FileSegment> file_segment)
|
void add(FileSegmentPtr file_segment)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
queue.push(file_segment);
|
queue.emplace(file_segment->key(), file_segment->offset(), file_segment);
|
||||||
}
|
}
|
||||||
|
|
||||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheDownloadQueueElements);
|
CurrentMetrics::add(CurrentMetrics::FilesystemCacheDownloadQueueElements);
|
||||||
@ -356,8 +356,19 @@ private:
|
|||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::condition_variable cv;
|
std::condition_variable cv;
|
||||||
std::queue<std::weak_ptr<FileSegment>> queue;
|
|
||||||
bool cancelled = false;
|
bool cancelled = false;
|
||||||
|
|
||||||
|
struct DownloadInfo
|
||||||
|
{
|
||||||
|
CacheMetadata::Key key;
|
||||||
|
size_t offset;
|
||||||
|
/// We keep weak pointer to file segment
|
||||||
|
/// instead of just getting it from file_segment_metadata,
|
||||||
|
/// because file segment at key:offset count be removed and added back to metadata
|
||||||
|
/// before we actually started background download.
|
||||||
|
std::weak_ptr<FileSegment> file_segment;
|
||||||
|
};
|
||||||
|
std::queue<DownloadInfo> queue;
|
||||||
};
|
};
|
||||||
|
|
||||||
void CacheMetadata::downloadThreadFunc()
|
void CacheMetadata::downloadThreadFunc()
|
||||||
@ -365,6 +376,8 @@ void CacheMetadata::downloadThreadFunc()
|
|||||||
std::optional<Memory<>> memory;
|
std::optional<Memory<>> memory;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
Key key;
|
||||||
|
size_t offset;
|
||||||
std::weak_ptr<FileSegment> file_segment_weak;
|
std::weak_ptr<FileSegment> file_segment_weak;
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -379,7 +392,11 @@ void CacheMetadata::downloadThreadFunc()
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
file_segment_weak = download_queue->queue.front();
|
auto entry = download_queue->queue.front();
|
||||||
|
key = entry.key;
|
||||||
|
offset = entry.offset;
|
||||||
|
file_segment_weak = entry.file_segment;
|
||||||
|
|
||||||
download_queue->queue.pop();
|
download_queue->queue.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,19 +406,21 @@ void CacheMetadata::downloadThreadFunc()
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
auto file_segment = file_segment_weak.lock();
|
auto locked_key = lockKeyMetadata(key, KeyNotFoundPolicy::RETURN_NULL);
|
||||||
if (!file_segment
|
|
||||||
|| file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
auto locked_key = lockKeyMetadata(file_segment->key(), KeyNotFoundPolicy::RETURN_NULL);
|
|
||||||
if (!locked_key)
|
if (!locked_key)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
auto file_segment_metadata = locked_key->tryGetByOffset(file_segment->offset());
|
auto file_segment_metadata = locked_key->tryGetByOffset(offset);
|
||||||
if (!file_segment_metadata || file_segment_metadata->evicting())
|
if (!file_segment_metadata || file_segment_metadata->evicting())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
auto file_segment = file_segment_weak.lock();
|
||||||
|
|
||||||
|
if (!file_segment
|
||||||
|
|| file_segment != file_segment_metadata->file_segment
|
||||||
|
|| file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED)
|
||||||
|
continue;
|
||||||
|
|
||||||
holder = std::make_unique<FileSegmentsHolder>(FileSegments{file_segment});
|
holder = std::make_unique<FileSegmentsHolder>(FileSegments{file_segment});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user