mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Local cache for remote filesystem
This commit is contained in:
parent
1075f604ba
commit
a566099759
596
src/Common/FileCache.cpp
Normal file
596
src/Common/FileCache.cpp
Normal file
@ -0,0 +1,596 @@
|
||||
#include "FileCache.h"
|
||||
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/hex.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <pcg-random/pcg_random.hpp>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
String keyToStr(const FileCache::Key & key)
|
||||
{
|
||||
return getHexUIntLowercase(key);
|
||||
}
|
||||
}
|
||||
|
||||
FileCache::FileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_)
|
||||
: cache_base_path(cache_base_path_), max_size(max_size_), max_element_size(max_element_size_)
|
||||
{
|
||||
}
|
||||
|
||||
FileCache::Key FileCache::hash(const String & path)
|
||||
{
|
||||
return sipHash128(path.data(), path.size());
|
||||
}
|
||||
|
||||
String FileCache::path(const Key & key, size_t offset)
|
||||
{
|
||||
auto key_str = keyToStr(key);
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
|
||||
}
|
||||
|
||||
String FileCache::path(const Key & key)
|
||||
{
|
||||
auto key_str = keyToStr(key);
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
|
||||
}
|
||||
|
||||
LRUFileCache::LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_)
|
||||
: FileCache(cache_base_path_, max_size_, max_element_size_), log(&Poco::Logger::get("LRUFileCache"))
|
||||
{
|
||||
if (fs::exists(cache_base_path))
|
||||
restore();
|
||||
else
|
||||
fs::create_directories(cache_base_path);
|
||||
}
|
||||
|
||||
void LRUFileCache::useCell(
|
||||
const FileSegmentCell & cell, FileSegments & result, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
assert(cell.file_segment->state() == FileSegment::State::DOWNLOADED
|
||||
|| cell.file_segment->state() == FileSegment::State::DOWNLOADING);
|
||||
|
||||
result.push_back(cell.file_segment);
|
||||
/// Move to the end of the queue. The iterator remains valid.
|
||||
queue.splice(queue.end(), queue, cell.queue_iterator);
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
|
||||
const Key & key, size_t offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto it = files.find(key);
|
||||
if (it == files.end())
|
||||
return nullptr;
|
||||
|
||||
auto & offsets = it->second;
|
||||
auto cell_it = offsets.find(offset);
|
||||
if (cell_it == offsets.end())
|
||||
return nullptr;
|
||||
|
||||
return &cell_it->second;
|
||||
}
|
||||
|
||||
void LRUFileCache::removeCell(
|
||||
const Key & key, size_t offset, const LRUQueueIterator & queue_iterator, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
LOG_TEST(log, "Remove. Key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
queue.erase(queue_iterator);
|
||||
auto & offsets = files[key];
|
||||
offsets.erase(offset);
|
||||
if (offsets.empty())
|
||||
files.erase(key);
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::getImpl(
|
||||
const Key & key, const FileSegment::Range & range, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
/// Given range = [left, right] and non-overlapping ordered set of file segments,
|
||||
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
|
||||
|
||||
auto it = files.find(key);
|
||||
if (it == files.end())
|
||||
return {};
|
||||
|
||||
const auto & file_segments = it->second;
|
||||
if (file_segments.empty())
|
||||
{
|
||||
files.erase(it);
|
||||
return {};
|
||||
}
|
||||
|
||||
FileSegments result;
|
||||
auto segment_it = file_segments.lower_bound(range.left);
|
||||
if (segment_it == file_segments.end())
|
||||
{
|
||||
/// N - last cached segment for given file key, segment{N}.offset < range.left:
|
||||
/// segment{N} segment{N}
|
||||
/// [________ [_______]
|
||||
/// [__________] OR [________]
|
||||
/// ^ ^
|
||||
/// range.left range.left
|
||||
|
||||
const auto & cell = (--file_segments.end())->second;
|
||||
if (cell.file_segment->range().right < range.left)
|
||||
return {};
|
||||
|
||||
useCell(cell, result, cache_lock);
|
||||
}
|
||||
else /// segment_it <-- segmment{k}
|
||||
{
|
||||
if (segment_it != file_segments.begin())
|
||||
{
|
||||
const auto & prev_cell = std::prev(segment_it)->second;
|
||||
const auto & prev_cell_range = prev_cell.file_segment->range();
|
||||
|
||||
if (range.left <= prev_cell_range.right)
|
||||
{
|
||||
/// segment{k-1} segment{k}
|
||||
/// [________] [_____
|
||||
/// [___________
|
||||
/// ^
|
||||
/// range.left
|
||||
|
||||
useCell(prev_cell, result, cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
/// segment{k} ... segment{k-1} segment{k} segment{k}
|
||||
/// [______ [______] [____ [________
|
||||
/// [_________ OR [________ OR [______] ^
|
||||
/// ^ ^ ^ segment{k}.offset
|
||||
/// range.left range.left range.right
|
||||
|
||||
while (segment_it != file_segments.end())
|
||||
{
|
||||
const auto & cell = segment_it->second;
|
||||
if (range.right < cell.file_segment->range().left)
|
||||
break;
|
||||
useCell(cell, result, cache_lock);
|
||||
++segment_it;
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: remove this extra debug logging.
|
||||
String ranges;
|
||||
for (const auto & s : result)
|
||||
ranges += s->range().toString() + " ";
|
||||
LOG_TEST(log, "Cache get. Key: {}, range: {}, file_segments number: {}, ranges: {}",
|
||||
keyToStr(key), range.toString(), result.size(), ranges);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
|
||||
{
|
||||
FileSegment::Range range(offset, offset + size - 1);
|
||||
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
/// Get all segments which intersect with the given range.
|
||||
auto file_segments = getImpl(key, range, cache_lock);
|
||||
|
||||
if (file_segments.empty())
|
||||
{
|
||||
/// If there are no such file segments, try to reserve space for
|
||||
/// range [offset, offset + size) and put it in cache.
|
||||
|
||||
auto * cell = setImpl(key, range.left, range.size(), cache_lock);
|
||||
if (cell)
|
||||
file_segments = {cell->file_segment};
|
||||
else
|
||||
file_segments = {FileSegment::createEmpty(offset, size, key, this)};
|
||||
}
|
||||
else
|
||||
{
|
||||
/// There are segments [segment1, ..., segmentN]
|
||||
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
|
||||
/// intersect with given range.
|
||||
|
||||
/// It can have holes:
|
||||
/// [____________________] -- requested range
|
||||
/// [____] [_] [_________] -- intersecting cache [segment1, ..., segmentN]
|
||||
|
||||
/// As long as there is space in cache, try to reserve space and
|
||||
/// create a cache cell for range correspong to each hole.
|
||||
|
||||
auto it = file_segments.begin();
|
||||
auto segment_range = (*it)->range();
|
||||
|
||||
size_t current_pos;
|
||||
if (segment_range.left < range.left)
|
||||
{
|
||||
/// [_______ -- requested range
|
||||
/// [_______
|
||||
/// ^
|
||||
/// segment1
|
||||
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
}
|
||||
else
|
||||
current_pos = range.left;
|
||||
|
||||
while (current_pos <= range.right && it != file_segments.end())
|
||||
{
|
||||
segment_range = (*it)->range();
|
||||
|
||||
if (current_pos == segment_range.left)
|
||||
{
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(current_pos < segment_range.left);
|
||||
|
||||
auto hole_size = segment_range.left - current_pos;
|
||||
auto * cell = setImpl(key, current_pos, hole_size, cache_lock);
|
||||
if (cell)
|
||||
file_segments.insert(it, cell->file_segment);
|
||||
else
|
||||
file_segments.insert(it, FileSegment::createEmpty(current_pos, hole_size, key, this));
|
||||
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
}
|
||||
|
||||
if (current_pos <= range.right)
|
||||
{
|
||||
/// ________] -- requested range
|
||||
/// _____]
|
||||
/// ^
|
||||
/// segmentN
|
||||
|
||||
auto hole_size = range.right - current_pos + 1;
|
||||
auto * cell = setImpl(key, current_pos, hole_size, cache_lock);
|
||||
if (cell)
|
||||
file_segments.push_back(cell->file_segment);
|
||||
else
|
||||
file_segments.push_back(FileSegment::createEmpty(current_pos, hole_size, key, this));
|
||||
}
|
||||
}
|
||||
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell * LRUFileCache::setImpl(
|
||||
const Key & key, size_t offset, size_t size, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (!size)
|
||||
return nullptr; /// Empty files are not cached.
|
||||
|
||||
LOG_TEST(log, "Set. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size);
|
||||
|
||||
if (files[key].contains(offset))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache already exists for key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size);
|
||||
|
||||
bool reserved = tryReserve(size, cache_lock);
|
||||
if (!reserved)
|
||||
return nullptr;
|
||||
|
||||
FileSegmentCell cell(
|
||||
std::make_shared<FileSegment>(offset, size, key, this),
|
||||
queue.insert(queue.end(), std::make_pair(key, offset)));
|
||||
|
||||
auto & offsets = files[key];
|
||||
|
||||
if (offsets.empty())
|
||||
{
|
||||
auto key_path = path(key);
|
||||
if (!fs::exists(key_path))
|
||||
fs::create_directories(key_path);
|
||||
}
|
||||
|
||||
auto [it, inserted] = offsets.insert({offset, std::move(cell)});
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Failed to insert into cache key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size);
|
||||
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
bool LRUFileCache::tryReserve(size_t size, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto queue_size = queue.size() + 1;
|
||||
auto removed_size = 0;
|
||||
|
||||
auto is_overflow = [&]
|
||||
{
|
||||
return (current_size + size - removed_size > max_size)
|
||||
|| (max_element_size != 0 && queue_size > max_element_size);
|
||||
};
|
||||
|
||||
std::vector<FileSegment *> to_evict;
|
||||
|
||||
auto key_it = queue.begin();
|
||||
while (is_overflow() && key_it != queue.end())
|
||||
{
|
||||
const auto [key, offset] = *key_it++;
|
||||
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
size_t cell_size = cell->size();
|
||||
|
||||
/// It is guaranteed that cell is not removed from cache as long as
|
||||
/// pointer to corresponding file segment is hold by any other thread.
|
||||
|
||||
if (cell->releasable())
|
||||
{
|
||||
switch (cell->file_segment->state())
|
||||
{
|
||||
case FileSegment::State::DOWNLOADED:
|
||||
{
|
||||
/// Cell will actually be removed only if
|
||||
/// we managed to reserve enough space.
|
||||
|
||||
to_evict.emplace_back(cell->file_segment.get());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
removeCell(key, offset, cell->queue_iterator, cache_lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
removed_size += cell_size;
|
||||
--queue_size;
|
||||
}
|
||||
}
|
||||
|
||||
if (is_overflow())
|
||||
return false;
|
||||
|
||||
for (auto & file_segment : to_evict)
|
||||
remove(*file_segment, cache_lock);
|
||||
|
||||
current_size += size - removed_size;
|
||||
if (current_size > (1ull << 63))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void LRUFileCache::remove(const FileSegment & file_segment, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
const auto & key = file_segment.key();
|
||||
auto offset = file_segment.range().left;
|
||||
|
||||
const auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to remove cell which is not in cache. Key: {}, offset: {}, segment state: {}",
|
||||
keyToStr(key), offset, file_segment.state());
|
||||
|
||||
removeImpl(key, offset, cell->queue_iterator, cache_lock);
|
||||
}
|
||||
|
||||
void LRUFileCache::removeImpl(
|
||||
const Key & key, size_t offset, const LRUQueueIterator & queue_iterator,
|
||||
[[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
removeCell(key, offset, queue_iterator, cache_lock);
|
||||
|
||||
auto cache_file_path = path(key, offset);
|
||||
if (fs::exists(cache_file_path))
|
||||
{
|
||||
try
|
||||
{
|
||||
fs::remove(cache_file_path);
|
||||
|
||||
/// If we just removed the last file segment -- also remove key directory.
|
||||
if (files.find(key) == files.end())
|
||||
{
|
||||
auto key_path = path(key);
|
||||
fs::remove(key_path);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
|
||||
keyToStr(key), offset, cache_file_path, getCurrentExceptionMessage(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::restore()
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
Key key;
|
||||
UInt64 offset;
|
||||
size_t size;
|
||||
std::vector<FileSegmentCell *> cells;
|
||||
|
||||
/// cache_base_path / key_prefix / key / offset
|
||||
|
||||
fs::directory_iterator key_prefix_it{cache_base_path};
|
||||
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
|
||||
{
|
||||
fs::directory_iterator key_it{key_prefix_it->path()};
|
||||
for (; key_it != fs::directory_iterator(); ++key_it)
|
||||
{
|
||||
key = hash(key_it->path().filename());
|
||||
|
||||
fs::directory_iterator offset_it{key_it->path()};
|
||||
for (; offset_it != fs::directory_iterator(); ++offset_it)
|
||||
{
|
||||
bool parsed = tryParse<UInt64>(offset, offset_it->path().filename());
|
||||
if (!parsed)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected file in cache: cannot parse offset. Path: {}", key_it->path().string());
|
||||
|
||||
size = offset_it->file_size();
|
||||
|
||||
auto * cell = setImpl(key, offset, size, cache_lock);
|
||||
if (cell)
|
||||
{
|
||||
cell->file_segment->download_state = FileSegment::State::DOWNLOADED;
|
||||
cell->file_segment->downloader = 0;
|
||||
|
||||
cells.push_back(cell);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log,
|
||||
"Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {})",
|
||||
max_size, available(), key_it->path().string(), size);
|
||||
fs::remove(path(key, offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
|
||||
pcg64 generator(randomSeed());
|
||||
std::shuffle(cells.begin(), cells.end(), generator);
|
||||
for (const auto & cell : cells)
|
||||
queue.splice(queue.end(), queue, cell->queue_iterator);
|
||||
}
|
||||
|
||||
void LRUFileCache::remove(const Key & key)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
auto it = files.find(key);
|
||||
if (it == files.end())
|
||||
return;
|
||||
|
||||
const auto & offsets = it->second;
|
||||
|
||||
for (const auto & [offset, cell] : offsets)
|
||||
removeImpl(key, offset, cell.queue_iterator, cache_lock);
|
||||
|
||||
files.erase(it);
|
||||
}
|
||||
|
||||
LRUFileCache::Stat LRUFileCache::getStat()
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
Stat stat
|
||||
{
|
||||
.size = queue.size(),
|
||||
.available = available(),
|
||||
.downloaded_size = 0,
|
||||
.downloading_size = 0,
|
||||
};
|
||||
|
||||
for (const auto & [key, offset] : queue)
|
||||
{
|
||||
const auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
switch (cell->file_segment->state())
|
||||
{
|
||||
case FileSegment::State::DOWNLOADED:
|
||||
{
|
||||
++stat.downloaded_size;
|
||||
break;
|
||||
}
|
||||
case FileSegment::State::DOWNLOADING:
|
||||
{
|
||||
++stat.downloading_size;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
||||
void FileSegment::complete(State state)
|
||||
{
|
||||
if (state != State::DOWNLOADED && state != State::ERROR)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can complete segment only with DOWNLOADED or ERROR state");
|
||||
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
||||
if (download_state == State::EMPTY)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete segment from EMPTY state");
|
||||
|
||||
std::lock_guard cache_lock(cache->mutex);
|
||||
|
||||
download_state = state;
|
||||
downloader = 0;
|
||||
|
||||
if (state == State::ERROR)
|
||||
cache->remove(*this, cache_lock);
|
||||
}
|
||||
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
FileSegment::State FileSegment::wait()
|
||||
{
|
||||
std::unique_lock segment_lock(mutex);
|
||||
|
||||
switch (download_state)
|
||||
{
|
||||
case State::DOWNLOADING:
|
||||
{
|
||||
cv.wait(segment_lock, [this]
|
||||
{
|
||||
return download_state == State::DOWNLOADED || download_state == State::ERROR;
|
||||
});
|
||||
break;
|
||||
}
|
||||
case State::DOWNLOADED:[[fallthrough]];
|
||||
case State::ERROR:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to wait for segment with incorrect");
|
||||
}
|
||||
}
|
||||
|
||||
return download_state;
|
||||
}
|
||||
|
||||
void FileSegment::release()
|
||||
{
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
||||
/// Empty segments are owned only by caller, not present in cache.
|
||||
if (download_state == State::EMPTY)
|
||||
return;
|
||||
|
||||
if (download_state != State::DOWNLOADED)
|
||||
{
|
||||
/// Segment is removed from cache here by downloader's FileSegmentsHolder only if
|
||||
/// downloader did not call segment->complete(State::ERROR), otherwise it is removed by downloader.
|
||||
|
||||
std::lock_guard cache_lock(cache->mutex);
|
||||
|
||||
download_state = State::ERROR;
|
||||
cache->remove(*this, cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
}
|
263
src/Common/FileCache.h
Normal file
263
src/Common/FileCache.h
Normal file
@ -0,0 +1,263 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Core/Types.h>
|
||||
#include <map>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class FileSegment;
|
||||
using FileSegmentPtr = std::shared_ptr<FileSegment>;
|
||||
using FileSegments = std::list<FileSegmentPtr>;
|
||||
struct FileSegmentsHolder;
|
||||
|
||||
/**
|
||||
* Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
|
||||
*/
|
||||
class FileCache : boost::noncopyable
|
||||
{
|
||||
friend class FileSegment;
|
||||
|
||||
public:
|
||||
using Key = UInt128;
|
||||
|
||||
FileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_);
|
||||
|
||||
virtual ~FileCache() = default;
|
||||
|
||||
size_t capacity() const { return max_size; }
|
||||
|
||||
static Key hash(const String & path);
|
||||
|
||||
String path(const Key & key, size_t offset);
|
||||
|
||||
String path(const Key & key);
|
||||
|
||||
/**
|
||||
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
|
||||
* return list of cached non-overlapping non-empty
|
||||
* file segments `[segment1, ..., segmentN]` which intersect with given interval.
|
||||
*
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
* DOWNLOADING means that either the segment is being downloaded by some other thread or that it
|
||||
* is going to be downloaded by the caller (just space reservation happened).
|
||||
* EMPTY means that the segment not in cache, not being downloaded and cannot be downloaded
|
||||
* by the caller (because of not enough space or max elements limit reached). E.g. returned list is never empty.
|
||||
*
|
||||
* As long as pointers to returned file segments are hold
|
||||
* it is guaranteed that these file segments are not removed from cache.
|
||||
*
|
||||
* If there is no suitable file segment found in cache, create a cache cell for the whole
|
||||
* bytes range [offset, offset + size) as a new file segment and return it with DOWNLOADING state.
|
||||
* If there are some intersecting segments (either DOWNLOADED or DOWNLOADING),
|
||||
* but not the full range (e.g. there are holes), try reserve space for them.
|
||||
* For segments with successfully reserved space - mark their state as DOWNLOADING,
|
||||
* for those which cannot possibly be downloaded mark state as EMPTY.
|
||||
*/
|
||||
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0;
|
||||
|
||||
virtual void remove(const Key & key) = 0;
|
||||
|
||||
protected:
|
||||
String cache_base_path;
|
||||
size_t max_size = 0;
|
||||
size_t max_element_size = 0;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
|
||||
virtual void remove(
|
||||
const FileSegment & file_segment, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
};
|
||||
|
||||
using FileCachePtr = std::shared_ptr<FileCache>;
|
||||
|
||||
class FileSegment : boost::noncopyable
|
||||
{
|
||||
friend class LRUFileCache;
|
||||
|
||||
public:
|
||||
enum class State
|
||||
{
|
||||
DOWNLOADED,
|
||||
DOWNLOADING,
|
||||
ERROR,
|
||||
EMPTY,
|
||||
};
|
||||
|
||||
FileSegment(size_t offset_, size_t size_, const FileCache::Key & key_, FileCache * cache_, bool empty_ = false)
|
||||
: segment_range(offset_, offset_ + size_ - 1)
|
||||
, download_state(empty_ ? State::EMPTY : State::DOWNLOADING)
|
||||
, downloader(getThreadId()), file_key(key_) , cache(cache_) {}
|
||||
|
||||
/// Represents an interval [left, right] including both boundaries.
|
||||
struct Range
|
||||
{
|
||||
size_t left;
|
||||
size_t right;
|
||||
|
||||
Range(size_t left_, size_t right_) : left(left_), right(right_) {}
|
||||
|
||||
size_t size() const { return right - left + 1; }
|
||||
|
||||
String toString() const { return '[' + std::to_string(left) + ',' + std::to_string(right) + ']'; }
|
||||
};
|
||||
|
||||
State state() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return download_state;
|
||||
}
|
||||
|
||||
void complete(State state);
|
||||
|
||||
State wait();
|
||||
|
||||
void release();
|
||||
|
||||
const Range & range() const { return segment_range; }
|
||||
|
||||
const FileCache::Key & key() const { return file_key; }
|
||||
|
||||
/// State can be DOWNLOADING either if segment is being downloaded by some other thread
|
||||
/// or if current thread should download it. This method allows to tell the caller that
|
||||
/// he is the one who must do the downloading.
|
||||
bool isDownloader() const { return getThreadId() == downloader; }
|
||||
|
||||
static FileSegmentPtr createEmpty(
|
||||
size_t offset, size_t size, const FileCache::Key & key, FileCache * cache)
|
||||
{
|
||||
return std::make_shared<FileSegment>(offset, size, key, cache, true);
|
||||
}
|
||||
|
||||
private:
|
||||
Range segment_range;
|
||||
|
||||
State download_state;
|
||||
UInt64 downloader;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
|
||||
/// If end up with ERROR state, need to remove cell from cache. In this case cell is
|
||||
/// removed only either by downloader or downloader's by FileSegmentsHolder (in case downloader did not do that).
|
||||
FileCache::Key file_key;
|
||||
FileCache * cache;
|
||||
};
|
||||
|
||||
|
||||
struct FileSegmentsHolder : boost::noncopyable
|
||||
{
|
||||
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(file_segments_) {}
|
||||
FileSegmentsHolder(FileSegmentsHolder && other) : file_segments(std::move(other.file_segments)) {}
|
||||
|
||||
~FileSegmentsHolder()
|
||||
{
|
||||
for (auto & segment : file_segments)
|
||||
{
|
||||
/// Notify with either DOWNLOADED or ERROR.
|
||||
/// In general this must be done manually by downloader by calling segment->complete(state)
|
||||
/// for each segment once it has been downloaded or failed to download.
|
||||
/// But if not done by downloader, downloader's holder will do that.
|
||||
|
||||
if (segment && segment->isDownloader())
|
||||
segment->release();
|
||||
}
|
||||
}
|
||||
|
||||
FileSegments file_segments;
|
||||
};
|
||||
|
||||
|
||||
class LRUFileCache final : public FileCache
|
||||
{
|
||||
public:
|
||||
LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_ = 0);
|
||||
|
||||
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
|
||||
|
||||
void remove(const Key & key) override;
|
||||
|
||||
private:
|
||||
using FileKeyAndOffset = std::pair<Key, size_t>;
|
||||
using LRUQueue = std::list<FileKeyAndOffset>;
|
||||
using LRUQueueIterator = typename LRUQueue::iterator;
|
||||
|
||||
struct FileSegmentCell : boost::noncopyable
|
||||
{
|
||||
FileSegmentPtr file_segment;
|
||||
LRUQueueIterator queue_iterator;
|
||||
|
||||
bool releasable() const { return file_segment.unique(); }
|
||||
|
||||
size_t size() const { return file_segment->range().size(); }
|
||||
|
||||
FileSegmentCell(FileSegmentPtr file_segment_, LRUQueueIterator && queue_iterator_)
|
||||
: file_segment(file_segment_), queue_iterator(queue_iterator_) {}
|
||||
|
||||
FileSegmentCell(FileSegmentCell && other)
|
||||
: file_segment(std::move(other.file_segment))
|
||||
, queue_iterator(std::move(other.queue_iterator)) {}
|
||||
};
|
||||
|
||||
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
|
||||
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
|
||||
|
||||
CachedFiles files;
|
||||
LRUQueue queue;
|
||||
size_t current_size = 0;
|
||||
Poco::Logger * log;
|
||||
|
||||
/**
|
||||
* Get list of file segments which intesect with `range`.
|
||||
* If `key` is not in cache or there is not such range, return std::nullopt.
|
||||
*/
|
||||
FileSegments getImpl(
|
||||
const Key & key, const FileSegment::Range & range,
|
||||
[[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
/**
|
||||
* Try put file segment of given range in cache. Return nullptr, if unsuccessful.
|
||||
*/
|
||||
FileSegmentCell * setImpl(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
[[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void remove(const FileSegment & file_segment, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock) override;
|
||||
|
||||
void removeImpl(const Key & key, size_t offset, const LRUQueueIterator & queue_iterator, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void removeCell(const Key & key, size_t offset, const LRUQueueIterator & queue_iterator, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
bool tryReserve(size_t size, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell * getCell(const Key & key, size_t offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
size_t available() const { return max_size - current_size; }
|
||||
|
||||
void restore();
|
||||
|
||||
public:
|
||||
struct Stat
|
||||
{
|
||||
size_t size;
|
||||
size_t available;
|
||||
size_t downloaded_size;
|
||||
size_t downloading_size;
|
||||
};
|
||||
|
||||
Stat getStat();
|
||||
};
|
||||
|
||||
}
|
37
src/Common/FileCacheFactory.cpp
Normal file
37
src/Common/FileCacheFactory.cpp
Normal file
@ -0,0 +1,37 @@
|
||||
#include "FileCacheFactory.h"
|
||||
#include "FileCache.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
FileCacheFactory & FileCacheFactory::instance()
|
||||
{
|
||||
static FileCacheFactory ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &)
|
||||
{
|
||||
auto it = caches.find(cache_base_path);
|
||||
if (it == caches.end())
|
||||
return nullptr;
|
||||
return it->second;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::getOrCreate(const std::string & cache_base_path, size_t max_size, size_t max_elements_size)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto cache = getImpl(cache_base_path, lock);
|
||||
if (cache)
|
||||
{
|
||||
if (cache->capacity() != max_size)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cache with path `{}` already exists, but has different max size", cache_base_path);
|
||||
return cache;
|
||||
}
|
||||
|
||||
cache = std::make_shared<LRUFileCache>(cache_base_path, max_size, max_elements_size);
|
||||
caches.emplace(cache_base_path, cache);
|
||||
return cache;
|
||||
}
|
||||
|
||||
}
|
27
src/Common/FileCacheFactory.h
Normal file
27
src/Common/FileCacheFactory.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <unordered_map>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Creates a FileCache object for cache_base_path.
|
||||
*/
|
||||
class FileCacheFactory final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static FileCacheFactory & instance();
|
||||
|
||||
FileCachePtr getOrCreate(const std::string & cache_base_path, size_t max_size, size_t max_elements_size);
|
||||
|
||||
private:
|
||||
FileCachePtr getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &);
|
||||
|
||||
std::mutex mutex;
|
||||
std::unordered_map<std::string, FileCachePtr> caches;
|
||||
};
|
||||
|
||||
}
|
9
src/Common/FileCache_fwd.h
Normal file
9
src/Common/FileCache_fwd.h
Normal file
@ -0,0 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class FileCache;
|
||||
using FileCachePtr = std::shared_ptr<FileCache>;
|
||||
|
||||
}
|
@ -252,6 +252,8 @@
|
||||
\
|
||||
M(RemoteFSReadMicroseconds, "Time of reading from remote filesystem.") \
|
||||
M(RemoteFSReadBytes, "Read bytes from remote filesystem.") \
|
||||
M(RemoteFSCacheReadBytes, "Read bytes from cache of remote filesystem.") \
|
||||
M(RemoteFSCacheDownloadBytes, "Bytes downloaded to cache from remote filesystem.") \
|
||||
\
|
||||
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
|
||||
M(RemoteFSPrefetches, "Number of prefetches made with asynchronous reading from remote filesystem") \
|
||||
|
404
src/Common/tests/gtest_lru_file_cache.cpp
Normal file
404
src/Common/tests/gtest_lru_file_cache.cpp
Normal file
@ -0,0 +1,404 @@
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <gtest/gtest.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <filesystem>
|
||||
#include <thread>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
String cache_base_path = fs::current_path() / "test_lru_file_cache" / "";
|
||||
|
||||
void assertRange(
|
||||
size_t assert_n, DB::FileSegmentPtr file_segment,
|
||||
const DB::FileSegment::Range & expected_range, DB::FileSegment::State expected_state)
|
||||
{
|
||||
auto range = file_segment->range();
|
||||
|
||||
std::cerr << fmt::format("\nAssert #{} : {} == {} (state: {} == {})\n", assert_n,
|
||||
range.toString(), expected_range.toString(),
|
||||
toString(file_segment->state()), toString(expected_state));
|
||||
|
||||
ASSERT_EQ(range.left, expected_range.left);
|
||||
ASSERT_EQ(range.right, expected_range.right);
|
||||
ASSERT_EQ(file_segment->state(), expected_state);
|
||||
};
|
||||
|
||||
void printRanges(const auto & segments)
|
||||
{
|
||||
std::cerr << "\nHaving file segments: ";
|
||||
for (const auto & segment : segments)
|
||||
std::cerr << '\n' << segment->range().toString() << "\n";
|
||||
}
|
||||
|
||||
std::vector<DB::FileSegmentPtr> fromHolder(const DB::FileSegmentsHolder & holder)
|
||||
{
|
||||
return std::vector<DB::FileSegmentPtr>(holder.file_segments.begin(), holder.file_segments.end());
|
||||
}
|
||||
|
||||
String getFileSegmentPath(const String & base_path, const String & key, size_t offset)
|
||||
{
|
||||
return fs::path(base_path) / key.substr(0, 3) / key / DB::toString(offset);
|
||||
}
|
||||
|
||||
void download(DB::FileSegmentPtr file_segment)
|
||||
{
|
||||
const auto & key = file_segment->key();
|
||||
auto offset = file_segment->range().left;
|
||||
size_t size = file_segment->range().size();
|
||||
|
||||
auto path = getFileSegmentPath(cache_base_path, key, offset);
|
||||
auto subdir = fs::path(cache_base_path) / key.substr(0, 3) / key;
|
||||
if (!fs::exists(subdir))
|
||||
fs::create_directories(subdir);
|
||||
|
||||
DB::WriteBufferFromFile file_buf(path);
|
||||
std::string data(size, '0');
|
||||
DB::writeString(data, file_buf);
|
||||
}
|
||||
|
||||
void complete(const DB::FileSegmentsHolder & holder)
|
||||
{
|
||||
for (const auto & file_segment : holder.file_segments)
|
||||
{
|
||||
download(file_segment);
|
||||
file_segment->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TEST(LRUFileCache, get)
|
||||
{
|
||||
if (fs::exists(cache_base_path))
|
||||
fs::remove_all(cache_base_path);
|
||||
fs::create_directory(cache_base_path);
|
||||
|
||||
auto cache = DB::LRUFileCache(cache_base_path, 30, 5);
|
||||
auto key = "key1";
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 0, 10); /// Add range [0, 9]
|
||||
auto segments = fromHolder(holder);
|
||||
/// Range was not present in cache. It should be added in cache as one while file segment.
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
|
||||
assertRange(1, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
download(segments[0]);
|
||||
segments[0]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
/// Current cache: [__________]
|
||||
/// ^ ^
|
||||
/// 0 9
|
||||
|
||||
{
|
||||
/// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache.
|
||||
auto holder = cache.getOrSet(key, 5, 10);
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 2);
|
||||
|
||||
assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(3, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
download(segments[1]);
|
||||
segments[1]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
/// Current cache: [__________][_____]
|
||||
/// ^ ^^ ^
|
||||
/// 0 910 14
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 9, 1); /// Get [9, 9]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(4, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 9, 2); /// Get [9, 10]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 2);
|
||||
assertRange(5, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 10, 1); /// Get [10, 10]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(7, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
complete(cache.getOrSet(key, 17, 4)); /// Get [17, 20]
|
||||
complete(cache.getOrSet(key, 24, 3)); /// Get [24, 26]
|
||||
complete(cache.getOrSet(key, 27, 1)); /// Get [27, 27]
|
||||
|
||||
/// Current cache: [__________][_____] [____] [___][]
|
||||
/// ^ ^^ ^ ^ ^ ^ ^^^
|
||||
/// 0 910 14 17 20 24 2627
|
||||
///
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 0, 26); /// Get [0, 25]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 6);
|
||||
|
||||
assertRange(8, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(9, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
/// Missing [15, 16] should be added in cache.
|
||||
assertRange(10, segments[2], DB::FileSegment::Range(15, 16), DB::FileSegment::State::DOWNLOADING);
|
||||
download(segments[2]);
|
||||
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
assertRange(11, segments[3], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
/// New [21, 23], but will not be added in cache because of elements limit (5)
|
||||
assertRange(12, segments[4], DB::FileSegment::Range(21, 23), DB::FileSegment::State::EMPTY);
|
||||
|
||||
assertRange(13, segments[5], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
/// Current cache: [__________][_____][ ][____] [___]
|
||||
/// ^ ^ ^
|
||||
/// 0 20 24
|
||||
///
|
||||
|
||||
/// Range [27, 27] must be evicted in previous getOrSet [0, 25].
|
||||
/// Let's not invalidate pointers to returned segments from range [0, 25] and
|
||||
/// as max elements size is reached, next attempt to put something in cache should fail.
|
||||
/// This will also check that [27, 27] was indeed evicted.
|
||||
|
||||
auto holder1 = cache.getOrSet(key, 27, 1);
|
||||
auto segments_1 = fromHolder(holder1); /// Get [27, 27]
|
||||
ASSERT_EQ(segments_1.size(), 1);
|
||||
assertRange(12, segments_1[0], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 12, 10); /// Get [12, 21]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 4);
|
||||
|
||||
assertRange(14, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(15, segments[1], DB::FileSegment::Range(15, 16), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(16, segments[2], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
assertRange(17, segments[3], DB::FileSegment::Range(21, 21), DB::FileSegment::State::DOWNLOADING);
|
||||
download(segments[3]);
|
||||
segments[3]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
/// Current cache: [_____][__][____][_] [___]
|
||||
/// ^ ^ ^ ^ ^
|
||||
/// 10 17 21 24 26
|
||||
|
||||
ASSERT_EQ(cache.getStat().size, 5);
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 23, 5); /// Get [23, 28]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
assertRange(18, segments[0], DB::FileSegment::Range(23, 23), DB::FileSegment::State::DOWNLOADING);
|
||||
assertRange(19, segments[1], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(20, segments[2], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
download(segments[0]);
|
||||
download(segments[2]);
|
||||
segments[0]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
/// Current cache: [____][_] [][___][__]
|
||||
/// ^ ^ ^^^ ^^ ^
|
||||
/// 17 21 2324 26 28
|
||||
|
||||
{
|
||||
auto holder5 = cache.getOrSet(key, 2, 3); /// Get [2, 4]
|
||||
auto s5 = fromHolder(holder5);
|
||||
ASSERT_EQ(s5.size(), 1);
|
||||
assertRange(21, s5[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
auto holder1 = cache.getOrSet(key, 30, 2); /// Get [30, 31]
|
||||
auto s1 = fromHolder(holder1);
|
||||
ASSERT_EQ(s1.size(), 1);
|
||||
assertRange(22, s1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
auto holder1_1 = cache.getOrSet(key, 30, 2); /// Get [30, 31] once again.
|
||||
auto s1_1 = fromHolder(holder1);
|
||||
ASSERT_EQ(s1.size(), 1);
|
||||
assertRange(22, s1_1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
download(s5[0]);
|
||||
download(s1[0]);
|
||||
s5[0]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
s1[0]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
/// Current cache: [___] [_][___][_] [__]
|
||||
/// ^ ^ ^ ^ ^ ^ ^ ^
|
||||
/// 2 4 23 24 26 27 30 31
|
||||
|
||||
auto holder2 = cache.getOrSet(key, 23, 1); /// Get [23, 23]
|
||||
auto s2 = fromHolder(holder2);
|
||||
ASSERT_EQ(s2.size(), 1);
|
||||
auto holder3 = cache.getOrSet(key, 24, 3); /// Get [24, 26]
|
||||
auto s3 = fromHolder(holder3);
|
||||
ASSERT_EQ(s3.size(), 1);
|
||||
auto holder4 = cache.getOrSet(key, 27, 1); /// Get [27, 27]
|
||||
auto s4 = fromHolder(holder4);
|
||||
ASSERT_EQ(s4.size(), 1);
|
||||
|
||||
/// All cache is now unreleasable because pointers are stil hold
|
||||
auto holder6 = cache.getOrSet(key, 0, 40);
|
||||
auto f = fromHolder(holder6);
|
||||
ASSERT_EQ(f.size(), 9);
|
||||
|
||||
assertRange(23, f[0], DB::FileSegment::Range(0, 1), DB::FileSegment::State::EMPTY);
|
||||
assertRange(24, f[2], DB::FileSegment::Range(5, 22), DB::FileSegment::State::EMPTY);
|
||||
assertRange(25, f[6], DB::FileSegment::Range(28, 29), DB::FileSegment::State::EMPTY);
|
||||
assertRange(26, f[8], DB::FileSegment::Range(32, 39), DB::FileSegment::State::EMPTY);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 2, 3); /// Get [2, 4]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(27, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
/// Current cache: [___] [_][___][_] [__]
|
||||
/// ^ ^ ^ ^ ^ ^ ^ ^
|
||||
/// 2 4 23 24 26 27 30 31
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 25, 5); /// Get [25, 29]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
assertRange(28, segments[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(29, segments[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(30, segments[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
ASSERT_TRUE(segments[2]->isDownloader());
|
||||
|
||||
bool lets_start_download = false;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
|
||||
std::thread other_1([&]
|
||||
{
|
||||
auto holder_2 = cache.getOrSet(key, 25, 5); /// Get [25, 29] once again.
|
||||
auto segments_2 = fromHolder(holder_2);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
assertRange(31, segments_2[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(32, segments_2[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(33, segments_2[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
ASSERT_TRUE(!segments[2]->isDownloader());
|
||||
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
lets_start_download = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
|
||||
segments_2[2]->wait();
|
||||
ASSERT_TRUE(segments_2[2]->state() == DB::FileSegment::State::DOWNLOADED);
|
||||
});
|
||||
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
cv.wait(lock, [&]{ return lets_start_download; });
|
||||
}
|
||||
|
||||
download(segments[2]);
|
||||
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
|
||||
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
other_1.join();
|
||||
}
|
||||
|
||||
/// Current cache: [___] [___][_][__][__]
|
||||
/// ^ ^ ^ ^ ^^ ^^ ^
|
||||
/// 2 4 24 26 27 2930 31
|
||||
|
||||
{
|
||||
/// Now let's check the similar case but getting ERROR state after segment->wait(), when
|
||||
/// state is changed not manually via segment->complete(state) but from destructor of holder
|
||||
/// and notify_all() is also called from destructor of holder.
|
||||
|
||||
auto holder = cache.getOrSet(key, 3, 23); /// Get [3, 25]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
assertRange(34, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(35, segments[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::DOWNLOADING);
|
||||
assertRange(36, segments[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
ASSERT_TRUE(segments[1]->isDownloader());
|
||||
|
||||
bool lets_start_download = false;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
|
||||
std::thread other_1([&]
|
||||
{
|
||||
auto holder_2 = cache.getOrSet(key, 3, 23); /// Get [3, 25] once again
|
||||
auto segments_2 = fromHolder(holder);
|
||||
ASSERT_EQ(segments_2.size(), 3);
|
||||
|
||||
assertRange(37, segments_2[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(38, segments_2[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::DOWNLOADING);
|
||||
assertRange(39, segments_2[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
ASSERT_TRUE(!segments_2[1]->isDownloader());
|
||||
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::DOWNLOADING);
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
lets_start_download = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
|
||||
segments_2[1]->wait();
|
||||
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::ERROR);
|
||||
});
|
||||
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
cv.wait(lock, [&]{ return lets_start_download; });
|
||||
}
|
||||
|
||||
holder.~FileSegmentsHolder();
|
||||
other_1.join();
|
||||
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::ERROR);
|
||||
}
|
||||
|
||||
/// Current cache: [___] [___][_][__]
|
||||
/// ^ ^ ^ ^^ ^ ^
|
||||
/// 2 4 24 2627 28 29
|
||||
|
||||
{
|
||||
/// Test LRUCache::restore().
|
||||
|
||||
auto cache2 = DB::LRUFileCache(cache_base_path, 30, 5);
|
||||
ASSERT_EQ(cache2.getStat().size, 4);
|
||||
|
||||
auto holder1 = cache2.getOrSet(key, 2, 3); /// Get [2, 4]
|
||||
auto segments1 = fromHolder(holder1);
|
||||
assertRange(40, segments1[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
|
||||
|
||||
auto holder2 = cache2.getOrSet(key, 24, 6); /// Get [24, 29]
|
||||
auto segments2 = fromHolder(holder2);
|
||||
assertRange(41, segments2[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(42, segments2[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
|
||||
assertRange(43, segments2[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
}
|
@ -53,7 +53,7 @@ DiskAzureBlobStorage::DiskAzureBlobStorage(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
SettingsPtr settings_,
|
||||
GetDiskSettings settings_getter_) :
|
||||
IDiskRemote(name_, "", metadata_disk_, "DiskAzureBlobStorage", settings_->thread_pool_size),
|
||||
IDiskRemote(name_, "", metadata_disk_, nullptr, "DiskAzureBlobStorage", settings_->thread_pool_size),
|
||||
blob_container_client(blob_container_client_),
|
||||
current_settings(std::move(settings_)),
|
||||
settings_getter(settings_getter_) {}
|
||||
@ -70,13 +70,11 @@ std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
|
||||
|
||||
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
|
||||
|
||||
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
|
||||
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
||||
path, blob_container_client, metadata, settings->max_single_read_retries,
|
||||
settings->max_single_download_retries, read_settings, threadpool_read);
|
||||
settings->max_single_download_retries, read_settings);
|
||||
|
||||
if (threadpool_read)
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
auto reader = getThreadPoolReader();
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(reader_impl));
|
||||
|
@ -168,11 +168,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
||||
RemoteMetadata meta(path, remote_path);
|
||||
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
|
||||
|
||||
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(path, url, meta, getContext(), read_settings);
|
||||
|
||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(path, url, meta, getContext(), threadpool_read, read_settings);
|
||||
|
||||
if (threadpool_read)
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
auto reader = IDiskRemote::getThreadPoolReader();
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
|
||||
|
@ -62,7 +62,7 @@ DiskHDFS::DiskHDFS(
|
||||
SettingsPtr settings_,
|
||||
DiskPtr metadata_disk_,
|
||||
const Poco::Util::AbstractConfiguration & config_)
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_disk_, "DiskHDFS", settings_->thread_pool_size)
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_disk_, nullptr, "DiskHDFS", settings_->thread_pool_size)
|
||||
, config(config_)
|
||||
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
||||
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
||||
|
@ -280,6 +280,7 @@ IDiskRemote::IDiskRemote(
|
||||
const String & name_,
|
||||
const String & remote_fs_root_path_,
|
||||
DiskPtr metadata_disk_,
|
||||
FileCachePtr cache_,
|
||||
const String & log_name_,
|
||||
size_t thread_pool_size)
|
||||
: IDisk(std::make_unique<AsyncExecutor>(log_name_, thread_pool_size))
|
||||
@ -287,6 +288,7 @@ IDiskRemote::IDiskRemote(
|
||||
, name(name_)
|
||||
, remote_fs_root_path(remote_fs_root_path_)
|
||||
, metadata_disk(metadata_disk_)
|
||||
, cache(cache_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/config.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Disks/DiskFactory.h>
|
||||
#include <Disks/Executor.h>
|
||||
#include <utility>
|
||||
@ -53,6 +54,7 @@ public:
|
||||
const String & name_,
|
||||
const String & remote_fs_root_path_,
|
||||
DiskPtr metadata_disk_,
|
||||
FileCachePtr cache_,
|
||||
const String & log_name_,
|
||||
size_t thread_pool_size);
|
||||
|
||||
@ -157,6 +159,7 @@ protected:
|
||||
const String remote_fs_root_path;
|
||||
|
||||
DiskPtr metadata_disk;
|
||||
FileCachePtr cache;
|
||||
|
||||
private:
|
||||
void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
|
||||
|
@ -44,7 +44,7 @@ public:
|
||||
|
||||
void prefetch() override;
|
||||
|
||||
void setReadUntilPosition(size_t position) override;
|
||||
void setReadUntilPosition(size_t position) override; /// [..., position).
|
||||
|
||||
void setReadUntilEnd() override;
|
||||
|
||||
|
254
src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp
Normal file
254
src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp
Normal file
@ -0,0 +1,254 @@
|
||||
#include "CacheableReadBufferFromRemoteFS.h"
|
||||
#include <IO/createReadBufferFromFileBase.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteFSReadBytes;
|
||||
extern const Event RemoteFSCacheReadBytes;
|
||||
extern const Event RemoteFSCacheDownloadBytes;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS(
|
||||
const String & path_,
|
||||
FileCachePtr cache_,
|
||||
SeekableReadBufferPtr reader_,
|
||||
const ReadSettings & settings_,
|
||||
size_t read_until_position_)
|
||||
: SeekableReadBuffer(nullptr, 0)
|
||||
, log(&Poco::Logger::get("CacheableReadBufferFromRemoteFS"))
|
||||
, key(cache_->hash(path_))
|
||||
, cache(cache_)
|
||||
, reader(reader_)
|
||||
, settings(settings_)
|
||||
, read_until_position(read_until_position_)
|
||||
{
|
||||
}
|
||||
|
||||
void CacheableReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
|
||||
{
|
||||
file_segments_holder.emplace(cache->getOrSet(key, offset, size));
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
* DOWNLOADING means that either the segment is being downloaded by some other thread or that it
|
||||
* is going to be downloaded by the caller (just space reservation happened).
|
||||
* EMPTY means that the segment not in cache, not being downloaded and cannot be downloaded
|
||||
* by the caller (because of not enough space or max elements limit reached). E.g. returned list is never empty.
|
||||
*/
|
||||
if (file_segments_holder->file_segments.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of file segments cannot be empty");
|
||||
|
||||
LOG_TEST(log, "Having {} file segments to read", file_segments_holder->file_segments.size());
|
||||
current_file_segment_it = file_segments_holder->file_segments.begin();
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegmentPtr file_segment)
|
||||
{
|
||||
auto range = file_segment->range();
|
||||
LOG_TEST(log, "Current file segment: {}", range.toString());
|
||||
|
||||
assert(!impl || range.left == file_offset_of_buffer_end);
|
||||
|
||||
SeekableReadBufferPtr implementation_buffer;
|
||||
switch (file_segment->state())
|
||||
{
|
||||
case FileSegment::State::DOWNLOADED:
|
||||
{
|
||||
read_type = ReadType::CACHE;
|
||||
implementation_buffer = createReadBufferFromFileBase(cache->path(key, file_offset_of_buffer_end), settings);
|
||||
break;
|
||||
}
|
||||
case FileSegment::State::DOWNLOADING:
|
||||
{
|
||||
if (file_segment->isDownloader())
|
||||
{
|
||||
download_path = cache->path(key, file_offset_of_buffer_end);
|
||||
download_buffer = std::make_unique<WriteBufferFromFile>(download_path);
|
||||
|
||||
read_type = ReadType::REMOTE_FS_READ_AND_DOWNLOAD;
|
||||
implementation_buffer = reader;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto download_state = file_segment->wait();
|
||||
|
||||
if (download_state == FileSegment::State::DOWNLOADED)
|
||||
{
|
||||
read_type = ReadType::CACHE;
|
||||
implementation_buffer = createReadBufferFromFileBase(cache->path(key, file_offset_of_buffer_end), settings);
|
||||
}
|
||||
|
||||
read_type = ReadType::REMOTE_FS_READ;
|
||||
implementation_buffer = reader;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case FileSegment::State::ERROR:
|
||||
case FileSegment::State::EMPTY:
|
||||
{
|
||||
read_type = ReadType::REMOTE_FS_READ;
|
||||
implementation_buffer = reader;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD;
|
||||
|
||||
/// TODO: Add seek avoiding for s3 on the lowest level.
|
||||
implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right]
|
||||
implementation_buffer->seek(range.left, SEEK_SET);
|
||||
|
||||
return implementation_buffer;
|
||||
}
|
||||
|
||||
bool CacheableReadBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
if (!initialized)
|
||||
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
|
||||
|
||||
if (current_file_segment_it == file_segments_holder->file_segments.end())
|
||||
return false;
|
||||
|
||||
if (impl)
|
||||
{
|
||||
auto current_read_range = (*current_file_segment_it)->range();
|
||||
assert(current_read_range.left <= file_offset_of_buffer_end);
|
||||
|
||||
/// Previous file segment was read till the end.
|
||||
if (file_offset_of_buffer_end > current_read_range.right)
|
||||
{
|
||||
if (download_current_segment)
|
||||
{
|
||||
(*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED);
|
||||
|
||||
download_buffer.reset();
|
||||
download_path.clear();
|
||||
}
|
||||
|
||||
if (++current_file_segment_it == file_segments_holder->file_segments.end())
|
||||
return false;
|
||||
|
||||
impl = createReadBuffer(*current_file_segment_it);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
impl = createReadBuffer(*current_file_segment_it);
|
||||
}
|
||||
|
||||
auto current_read_range = (*current_file_segment_it)->range();
|
||||
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
|
||||
|
||||
assert(current_read_range.left <= file_offset_of_buffer_end);
|
||||
assert(current_read_range.right >= file_offset_of_buffer_end);
|
||||
|
||||
swap(*impl);
|
||||
bool result;
|
||||
try
|
||||
{
|
||||
result = impl->next();
|
||||
LOG_TEST(log, "Read {} bytes. Remaining bytes to read = {}", impl->buffer().size(), remaining_size_to_read);
|
||||
|
||||
if (result)
|
||||
{
|
||||
if (download_current_segment)
|
||||
download_buffer->write(working_buffer.begin(), working_buffer.size());
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (download_current_segment)
|
||||
(*current_file_segment_it)->complete(FileSegment::State::ERROR);
|
||||
|
||||
/// Note: If exception happens in another place -- out of scope of this buffer, then
|
||||
/// downloader's FileSegmentsHolder is responsible to set ERROR state and call notify.
|
||||
|
||||
/// (download_path (if exists) is removed from inside cache)
|
||||
throw;
|
||||
}
|
||||
|
||||
if (result)
|
||||
{
|
||||
/// TODO: This resize() is needed only for local fs read, so it is better to
|
||||
/// just implement setReadUntilPosition() for local filesysteam read buffer?
|
||||
impl->buffer().resize(std::min(impl->buffer().size(), remaining_size_to_read));
|
||||
file_offset_of_buffer_end += working_buffer.size();
|
||||
|
||||
switch (read_type)
|
||||
{
|
||||
case ReadType::CACHE:
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, working_buffer.size());
|
||||
break;
|
||||
}
|
||||
case ReadType::REMOTE_FS_READ:
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size());
|
||||
break;
|
||||
}
|
||||
case ReadType::REMOTE_FS_READ_AND_DOWNLOAD:
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size());
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, working_buffer.size());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
swap(*impl);
|
||||
|
||||
LOG_TEST(log, "Returning with {} bytes", working_buffer.size());
|
||||
return result;
|
||||
}
|
||||
|
||||
off_t CacheableReadBufferFromRemoteFS::seek(off_t offset, int whence)
|
||||
{
|
||||
if (initialized)
|
||||
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
|
||||
"Seek is allowed only before first read attempt from the buffer");
|
||||
|
||||
if (whence != SEEK_SET)
|
||||
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET allowed");
|
||||
|
||||
file_offset_of_buffer_end = offset;
|
||||
size_t size = getTotalSizeToRead();
|
||||
initialize(offset, size);
|
||||
|
||||
return offset;
|
||||
}
|
||||
|
||||
size_t CacheableReadBufferFromRemoteFS::getTotalSizeToRead()
|
||||
{
|
||||
/// Last position should be guaranteed to be set, as at least we always know file size.
|
||||
if (!read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Last position was not set");
|
||||
|
||||
/// On this level should be guaranteed that read size is non-zero.
|
||||
if (file_offset_of_buffer_end >= read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read boundaries mismatch. Expected {} < {}",
|
||||
file_offset_of_buffer_end, read_until_position);
|
||||
|
||||
return read_until_position - file_offset_of_buffer_end;
|
||||
}
|
||||
|
||||
off_t CacheableReadBufferFromRemoteFS::getPosition()
|
||||
{
|
||||
return file_offset_of_buffer_end - available();
|
||||
}
|
||||
|
||||
}
|
62
src/Disks/IO/CacheableReadBufferFromRemoteFS.h
Normal file
62
src/Disks/IO/CacheableReadBufferFromRemoteFS.h
Normal file
@ -0,0 +1,62 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/FileCache.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class CacheableReadBufferFromRemoteFS : public SeekableReadBuffer
|
||||
{
|
||||
public:
|
||||
CacheableReadBufferFromRemoteFS(
|
||||
const String & path,
|
||||
FileCachePtr cache_,
|
||||
SeekableReadBufferPtr reader_,
|
||||
const ReadSettings & settings_,
|
||||
size_t read_until_position_);
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
off_t getPosition() override;
|
||||
|
||||
private:
|
||||
void initialize(size_t offset, size_t size);
|
||||
SeekableReadBufferPtr createReadBuffer(FileSegmentPtr file_segment);
|
||||
size_t getTotalSizeToRead();
|
||||
|
||||
Poco::Logger * log;
|
||||
FileCache::Key key;
|
||||
FileCachePtr cache;
|
||||
SeekableReadBufferPtr reader;
|
||||
ReadSettings settings;
|
||||
|
||||
size_t read_until_position;
|
||||
size_t file_offset_of_buffer_end = 0;
|
||||
|
||||
std::optional<FileSegmentsHolder> file_segments_holder;
|
||||
FileSegments::iterator current_file_segment_it;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFile> download_buffer;
|
||||
String download_path;
|
||||
|
||||
SeekableReadBufferPtr impl;
|
||||
bool initialized = false;
|
||||
bool download_current_segment = false;
|
||||
|
||||
enum class ReadType
|
||||
{
|
||||
CACHE,
|
||||
REMOTE_FS_READ,
|
||||
REMOTE_FS_READ_AND_DOWNLOAD,
|
||||
};
|
||||
|
||||
ReadType read_type = ReadType::REMOTE_FS_READ;
|
||||
};
|
||||
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
#include <Storages/HDFS/ReadBufferFromHDFS.h>
|
||||
#endif
|
||||
|
||||
#include <Disks/IO/CacheableReadBufferFromRemoteFS.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
@ -26,33 +27,44 @@ namespace DB
|
||||
{
|
||||
|
||||
#if USE_AWS_S3
|
||||
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket,
|
||||
fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, read_until_position_);
|
||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
|
||||
auto reader = std::make_unique<ReadBufferFromS3>(
|
||||
client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries,
|
||||
settings, use_external_buffer, read_until_position, true);
|
||||
|
||||
auto cache = settings.remote_fs_cache;
|
||||
if (cache)
|
||||
return std::make_shared<CacheableReadBufferFromRemoteFS>(path, std::move(cache), std::move(reader), settings, read_until_position);
|
||||
|
||||
return std::move(reader);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path) const
|
||||
{
|
||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(blob_container_client, path, max_single_read_retries,
|
||||
max_single_download_retries, settings.remote_fs_buffer_size, threadpool_read, read_until_position_);
|
||||
max_single_download_retries, settings.remote_fs_buffer_size, use_external_buffer, read_until_position);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read, read_until_position_);
|
||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, use_external_buffer, read_until_position);
|
||||
}
|
||||
|
||||
|
||||
#if USE_HDFS
|
||||
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, read_until_position_);
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size);
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -75,8 +87,8 @@ ReadBufferFromRemoteFSGather::ReadResult ReadBufferFromRemoteFSGather::readInto(
|
||||
|
||||
file_offset_of_buffer_end = offset;
|
||||
bytes_to_ignore = ignore;
|
||||
if (bytes_to_ignore)
|
||||
assert(initialized());
|
||||
|
||||
assert(!bytes_to_ignore || initialized());
|
||||
|
||||
auto result = nextImpl();
|
||||
|
||||
@ -100,11 +112,10 @@ void ReadBufferFromRemoteFSGather::initialize()
|
||||
/// Do not create a new buffer if we already have what we need.
|
||||
if (!current_buf || current_buf_idx != i)
|
||||
{
|
||||
current_buf = createImplementationBuffer(file_path, read_until_position);
|
||||
current_buf_idx = i;
|
||||
|
||||
if (auto * in = dynamic_cast<SeekableReadBufferWithSize *>(current_buf.get()))
|
||||
in->setReadType(SeekableReadBufferWithSize::ReadType::DISK_READ);
|
||||
if (!read_until_position)
|
||||
read_until_position = metadata.remote_fs_objects[current_buf_idx].second;
|
||||
current_buf = createImplementationBuffer(file_path);
|
||||
}
|
||||
|
||||
current_buf->seek(current_buf_offset, SEEK_SET);
|
||||
@ -139,8 +150,10 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
||||
|
||||
++current_buf_idx;
|
||||
|
||||
const auto & current_path = metadata.remote_fs_objects[current_buf_idx].first;
|
||||
current_buf = createImplementationBuffer(current_path, read_until_position);
|
||||
const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx];
|
||||
if (!read_until_position)
|
||||
read_until_position = size;
|
||||
current_buf = createImplementationBuffer(path);
|
||||
|
||||
return readImpl();
|
||||
}
|
||||
|
@ -52,10 +52,12 @@ public:
|
||||
bool initialized() const { return current_buf != nullptr; }
|
||||
|
||||
protected:
|
||||
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0;
|
||||
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0;
|
||||
|
||||
RemoteMetadata metadata;
|
||||
|
||||
size_t read_until_position = 0;
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
||||
@ -76,8 +78,6 @@ private:
|
||||
*/
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
size_t read_until_position = 0;
|
||||
|
||||
String canonical_path;
|
||||
};
|
||||
|
||||
@ -93,25 +93,22 @@ public:
|
||||
const String & bucket_,
|
||||
IDiskRemote::Metadata metadata_,
|
||||
size_t max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool threadpool_read_ = false)
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(metadata_, path_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, settings(settings_)
|
||||
, threadpool_read(threadpool_read_)
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
String bucket;
|
||||
UInt64 max_single_read_retries;
|
||||
ReadSettings settings;
|
||||
bool threadpool_read;
|
||||
};
|
||||
#endif
|
||||
|
||||
@ -127,25 +124,22 @@ public:
|
||||
IDiskRemote::Metadata metadata_,
|
||||
size_t max_single_read_retries_,
|
||||
size_t max_single_download_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool threadpool_read_ = false)
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(metadata_, path_)
|
||||
, blob_container_client(blob_container_client_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, max_single_download_retries(max_single_download_retries_)
|
||||
, settings(settings_)
|
||||
, threadpool_read(threadpool_read_)
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
|
||||
size_t max_single_read_retries;
|
||||
size_t max_single_download_retries;
|
||||
ReadSettings settings;
|
||||
bool threadpool_read;
|
||||
};
|
||||
#endif
|
||||
|
||||
@ -158,22 +152,19 @@ public:
|
||||
const String & uri_,
|
||||
RemoteMetadata metadata_,
|
||||
ContextPtr context_,
|
||||
size_t threadpool_read_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(metadata_, path_)
|
||||
, uri(uri_)
|
||||
, context(context_)
|
||||
, threadpool_read(threadpool_read_)
|
||||
, settings(settings_)
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
|
||||
|
||||
private:
|
||||
String uri;
|
||||
ContextPtr context;
|
||||
bool threadpool_read;
|
||||
ReadSettings settings;
|
||||
};
|
||||
|
||||
@ -198,7 +189,7 @@ public:
|
||||
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
|
||||
|
||||
private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
|
||||
@ -41,9 +42,16 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu
|
||||
|
||||
std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
|
||||
{
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([request]
|
||||
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
|
||||
? CurrentThread::get().getThreadGroup()
|
||||
: MainThreadStatus::getInstance().getThreadGroup();
|
||||
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([request, running_group]
|
||||
{
|
||||
ThreadStatus thread_status;
|
||||
thread_status.attachQuery(running_group);
|
||||
setThreadName("VFSRead");
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
|
||||
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
|
||||
|
||||
|
@ -152,10 +152,11 @@ DiskS3::DiskS3(
|
||||
String bucket_,
|
||||
String s3_root_path_,
|
||||
DiskPtr metadata_disk_,
|
||||
FileCachePtr cache_,
|
||||
ContextPtr context_,
|
||||
SettingsPtr settings_,
|
||||
GetDiskSettings settings_getter_)
|
||||
: IDiskRemote(name_, s3_root_path_, metadata_disk_, "DiskS3", settings_->thread_pool_size)
|
||||
: IDiskRemote(name_, s3_root_path_, metadata_disk_, std::move(cache_), "DiskS3", settings_->thread_pool_size)
|
||||
, bucket(std::move(bucket_))
|
||||
, current_settings(std::move(settings_))
|
||||
, settings_getter(settings_getter_)
|
||||
@ -222,20 +223,22 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
|
||||
LOG_TEST(log, "Read from file by path: {}. Existing S3 objects: {}",
|
||||
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
|
||||
|
||||
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
ReadSettings disk_read_settings{read_settings};
|
||||
if (cache)
|
||||
disk_read_settings.remote_fs_cache = cache;
|
||||
|
||||
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
||||
path,
|
||||
settings->client, bucket, metadata,
|
||||
settings->s3_max_single_read_retries, read_settings, threadpool_read);
|
||||
path, settings->client, bucket, metadata,
|
||||
settings->s3_max_single_read_retries, disk_read_settings);
|
||||
|
||||
if (threadpool_read)
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
auto reader = getThreadPoolReader();
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(s3_impl));
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, disk_read_settings, std::move(s3_impl));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// TODO: Pass cache for non-asynchronous reader too.
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl));
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <re2/re2.h>
|
||||
#include <Disks/IDiskRemote.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -69,6 +70,7 @@ public:
|
||||
String bucket_,
|
||||
String s3_root_path_,
|
||||
DiskPtr metadata_disk_,
|
||||
FileCachePtr cache_,
|
||||
ContextPtr context_,
|
||||
SettingsPtr settings_,
|
||||
GetDiskSettings settings_getter_);
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "Disks/DiskRestartProxy.h"
|
||||
#include "Disks/DiskLocal.h"
|
||||
#include "Disks/RemoteDisksCommon.h"
|
||||
#include <Common/FileCacheFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -174,16 +175,40 @@ void registerDiskS3(DiskFactory & factory)
|
||||
ContextPtr context,
|
||||
const DisksMap & /*map*/) -> DiskPtr {
|
||||
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
|
||||
|
||||
if (uri.key.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No key in S3 uri: {}", uri.uri.toString());
|
||||
|
||||
if (uri.key.back() != '/')
|
||||
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key);
|
||||
|
||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||
|
||||
FileCachePtr cache;
|
||||
bool data_cache_enabled = config.getBool(config_prefix + ".data_cache_enabled", false);
|
||||
if (data_cache_enabled)
|
||||
{
|
||||
auto cache_base_path = config.getString(config_prefix + ".data_cache_path", fs::path(context->getPath()) / "disks" / name / "data_cache/");
|
||||
if (!fs::exists(cache_base_path))
|
||||
fs::create_directories(cache_base_path);
|
||||
|
||||
LOG_INFO(&Poco::Logger::get("Disks3(" + name + ")"), "Disk registered with cache path: {}", cache_base_path);
|
||||
|
||||
if (metadata_path == cache_base_path)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata path and cache base path must be different: {}", metadata_path);
|
||||
|
||||
size_t max_cache_size = config.getUInt64(config_prefix + ".data_cache_max_size", 1024*1024*1024);
|
||||
size_t max_cache_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", 1024*1024);
|
||||
|
||||
cache = FileCacheFactory::instance().getOrCreate(cache_base_path, max_cache_size, max_cache_elements);
|
||||
}
|
||||
|
||||
std::shared_ptr<IDisk> s3disk = std::make_shared<DiskS3>(
|
||||
name,
|
||||
uri.bucket,
|
||||
uri.key,
|
||||
metadata_disk,
|
||||
std::move(cache),
|
||||
context,
|
||||
getSettings(config, config_prefix, context),
|
||||
getSettings);
|
||||
|
@ -42,7 +42,8 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
UInt64 max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool use_external_buffer_,
|
||||
size_t read_until_position_)
|
||||
size_t read_until_position_,
|
||||
bool restricted_seek_)
|
||||
: SeekableReadBufferWithSize(nullptr, 0)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
@ -51,6 +52,7 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
, read_settings(settings_)
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
, read_until_position(read_until_position_)
|
||||
, restricted_seek(restricted_seek_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -152,8 +154,6 @@ bool ReadBufferFromS3::nextImpl()
|
||||
|
||||
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
||||
{
|
||||
bool restricted_seek = read_type == SeekableReadBufferWithSize::ReadType::DISK_READ;
|
||||
|
||||
if (impl && restricted_seek)
|
||||
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
@ -219,6 +219,12 @@ off_t ReadBufferFromS3::getPosition()
|
||||
return offset - available();
|
||||
}
|
||||
|
||||
void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
||||
{
|
||||
read_until_position = position;
|
||||
impl.reset();
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
{
|
||||
Aws::S3::Model::GetObjectRequest req;
|
||||
|
@ -31,6 +31,7 @@ private:
|
||||
String key;
|
||||
UInt64 max_single_read_retries;
|
||||
off_t offset = 0;
|
||||
|
||||
Aws::S3::Model::GetObjectResult read_result;
|
||||
std::unique_ptr<ReadBuffer> impl;
|
||||
|
||||
@ -44,7 +45,8 @@ public:
|
||||
UInt64 max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool use_external_buffer = false,
|
||||
size_t read_until_position_ = 0);
|
||||
size_t read_until_position_ = 0,
|
||||
bool restricted_seek_ = false);
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
@ -54,6 +56,8 @@ public:
|
||||
|
||||
std::optional<size_t> getTotalSize() override;
|
||||
|
||||
void setReadUntilPosition(size_t position) override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
|
||||
@ -62,6 +66,10 @@ private:
|
||||
bool use_external_buffer;
|
||||
|
||||
off_t read_until_position = 0;
|
||||
|
||||
/// There is different seek policy for disk seek and for non-disk seek
|
||||
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
|
||||
bool restricted_seek;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <cstddef>
|
||||
#include <string>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -79,6 +80,8 @@ struct ReadSettings
|
||||
|
||||
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
FileCachePtr remote_fs_cache;
|
||||
|
||||
size_t http_max_tries = 1;
|
||||
size_t http_retry_initial_backoff_ms = 100;
|
||||
size_t http_retry_max_backoff_ms = 1600;
|
||||
|
@ -15,10 +15,14 @@
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</unstable_s3>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
</hdd>
|
||||
<s3_with_cache>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
</s3_with_cache>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
@ -26,9 +30,6 @@
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
<external>
|
||||
<disk>hdd</disk>
|
||||
</external>
|
||||
</volumes>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
@ -38,6 +39,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</unstable_s3>
|
||||
<s3_cache>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3_with_cache</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_cache>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user