This commit is contained in:
kssenii 2022-01-22 13:41:11 +03:00
parent 9f295ebf92
commit 36a41ac146
8 changed files with 53 additions and 49 deletions

View File

@ -249,6 +249,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
} }
} }
assert(!file_segments.empty());
return FileSegmentsHolder(std::move(file_segments)); return FileSegmentsHolder(std::move(file_segments));
} }
@ -303,7 +304,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::setImpl(
if (!size) if (!size)
return nullptr; return nullptr;
LOG_TEST(log, "Set. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size); LOG_TEST(log, "SetImpl. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size);
switch (state) switch (state)
{ {
@ -715,7 +716,7 @@ FileSegment::State FileSegment::wait()
{ {
LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {}", range().toString()); LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {}", range().toString());
cv.wait_for(segment_lock, std::chrono::seconds(60)); cv.wait_for(segment_lock, std::chrono::seconds(60)); /// TODO: use value defined by setting
break; break;
} }
case State::DOWNLOADED:[[fallthrough]]; case State::DOWNLOADED:[[fallthrough]];

View File

@ -182,6 +182,9 @@ struct FileSegmentsHolder : boost::noncopyable
~FileSegmentsHolder() ~FileSegmentsHolder()
{ {
/// CacheableReadBufferFromRemoteFS removes completed file segments from FileSegmentsHolder, so
/// in destruction here remain only uncompleted file segments.
for (auto & segment : file_segments) for (auto & segment : file_segments)
{ {
/// In general file segment is completed by downloader by calling segment->complete() /// In general file segment is completed by downloader by calling segment->complete()

View File

@ -1,7 +1,9 @@
#include "CacheableReadBufferFromRemoteFS.h" #include "CacheableReadBufferFromRemoteFS.h"
#include <Common/hex.h>
#include <IO/createReadBufferFromFileBase.h> #include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <filesystem>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -32,7 +34,6 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS(
, reader(reader_) , reader(reader_)
, settings(settings_) , settings(settings_)
, read_until_position(read_until_position_) , read_until_position(read_until_position_)
, path(path_)
{ {
} }
@ -132,18 +133,29 @@ SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegm
return implementation_buffer; return implementation_buffer;
} }
void CacheableReadBufferFromRemoteFS::completeFileSegmentAndGetNext() bool CacheableReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
{ {
LOG_TEST(log, "Completed segment: {}", (*current_file_segment_it)->range().toString());
auto file_segment_it = current_file_segment_it++; auto file_segment_it = current_file_segment_it++;
auto range = (*file_segment_it)->range(); auto range = (*file_segment_it)->range();
assert(file_offset_of_buffer_end > range.right); assert(file_offset_of_buffer_end > range.right);
if (download_current_segment) if (download_current_segment)
(*current_file_segment_it)->complete(); (*file_segment_it)->complete();
/// Do not hold pointer to file segment if it is not needed anymore /// Do not hold pointer to file segment if it is not needed anymore
/// so can become releasable and can be evicted from cache. /// so can become releasable and can be evicted from cache.
file_segments_holder->file_segments.erase(file_segment_it); file_segments_holder->file_segments.erase(file_segment_it);
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false;
impl = createReadBuffer(*current_file_segment_it);
LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString());
return true;
} }
bool CacheableReadBufferFromRemoteFS::nextImpl() bool CacheableReadBufferFromRemoteFS::nextImpl()
@ -154,8 +166,6 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
if (current_file_segment_it == file_segments_holder->file_segments.end()) if (current_file_segment_it == file_segments_holder->file_segments.end())
return false; return false;
bool new_impl = false;
if (impl) if (impl)
{ {
auto current_read_range = (*current_file_segment_it)->range(); auto current_read_range = (*current_file_segment_it)->range();
@ -163,19 +173,16 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
if (file_offset_of_buffer_end > current_read_range.right) if (file_offset_of_buffer_end > current_read_range.right)
{ {
completeFileSegmentAndGetNext(); if (!completeFileSegmentAndGetNext())
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false; return false;
impl = createReadBuffer(*current_file_segment_it);
new_impl = true;
} }
} }
else else
{ {
impl = createReadBuffer(*current_file_segment_it); impl = createReadBuffer(*current_file_segment_it);
new_impl = true;
/// Seek is required only for first file segment in the list of file segments.
impl->seek(file_offset_of_buffer_end, SEEK_SET);
} }
auto current_read_range = (*current_file_segment_it)->range(); auto current_read_range = (*current_file_segment_it)->range();
@ -183,15 +190,10 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
assert(current_read_range.left <= file_offset_of_buffer_end); assert(current_read_range.left <= file_offset_of_buffer_end);
assert(current_read_range.right >= file_offset_of_buffer_end); assert(current_read_range.right >= file_offset_of_buffer_end);
assert(!internal_buffer.empty());
swap(*impl); swap(*impl);
if (new_impl)
{
LOG_TEST(log, "SEEK TO {}", file_offset_of_buffer_end);
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}
bool result; bool result;
auto & file_segment = *current_file_segment_it; auto & file_segment = *current_file_segment_it;
@ -217,7 +219,7 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
file_segment->complete(); file_segment->complete();
/// Note: If exception happens in another place -- out of scope of this buffer, then /// Note: If exception happens in another place -- out of scope of this buffer, then
/// downloader's FileSegmentsHolder is responsible to set ERROR state and call notify. /// downloader's FileSegmentsHolder is responsible to call file_segment->complete().
/// (download_path (if exists) is removed from inside cache) /// (download_path (if exists) is removed from inside cache)
throw; throw;
@ -257,8 +259,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
if (file_offset_of_buffer_end > current_read_range.right) if (file_offset_of_buffer_end > current_read_range.right)
completeFileSegmentAndGetNext(); completeFileSegmentAndGetNext();
LOG_TEST(log, "Returning with {} bytes, last range: {}, current offset: {}", LOG_TEST(log, "Key: {}. Returning with {} bytes, current range: {}, current offset: {}",
working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end); getHexUIntLowercase(key), working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end);
return result; return result;
} }
@ -298,14 +300,4 @@ off_t CacheableReadBufferFromRemoteFS::getPosition()
return file_offset_of_buffer_end - available(); return file_offset_of_buffer_end - available();
} }
CacheableReadBufferFromRemoteFS::~CacheableReadBufferFromRemoteFS()
{
std::optional<FileSegment::Range> range;
if (download_current_segment
&& current_file_segment_it != file_segments_holder->file_segments.end())
range = (*current_file_segment_it)->range();
LOG_TEST(log, "Buffer reset. Current offset: {}, last download range: {}, state: {}",
file_offset_of_buffer_end, range ? range->toString() : "None", (*current_file_segment_it)->state());
}
} }

View File

@ -25,8 +25,6 @@ public:
off_t getPosition() override; off_t getPosition() override;
~CacheableReadBufferFromRemoteFS() override;
private: private:
void initialize(size_t offset, size_t size); void initialize(size_t offset, size_t size);
@ -34,7 +32,7 @@ private:
SeekableReadBufferPtr createReadBuffer(FileSegmentPtr file_segment); SeekableReadBufferPtr createReadBuffer(FileSegmentPtr file_segment);
size_t getTotalSizeToRead(); size_t getTotalSizeToRead();
void completeFileSegmentAndGetNext(); bool completeFileSegmentAndGetNext();
Poco::Logger * log; Poco::Logger * log;
FileCache::Key key; FileCache::Key key;

View File

@ -27,8 +27,9 @@ namespace DB
{ {
#if USE_AWS_S3 #if USE_AWS_S3
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) const SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path)
{ {
current_path = path;
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto reader = std::make_unique<ReadBufferFromS3>( auto reader = std::make_unique<ReadBufferFromS3>(
@ -49,8 +50,9 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
#if USE_AZURE_BLOB_STORAGE #if USE_AZURE_BLOB_STORAGE
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path) const SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path)
{ {
current_path = path;
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
return std::make_unique<ReadBufferFromAzureBlobStorage>(blob_container_client, path, max_single_read_retries, return std::make_unique<ReadBufferFromAzureBlobStorage>(blob_container_client, path, max_single_read_retries,
max_single_download_retries, settings.remote_fs_buffer_size, use_external_buffer, read_until_position); max_single_download_retries, settings.remote_fs_buffer_size, use_external_buffer, read_until_position);
@ -58,15 +60,16 @@ SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementation
#endif #endif
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path)
{ {
current_path = path;
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, use_external_buffer, read_until_position); return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, use_external_buffer, read_until_position);
} }
#if USE_HDFS #if USE_HDFS
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) const SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path)
{ {
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size); return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size);
} }
@ -135,6 +138,8 @@ void ReadBufferFromRemoteFSGather::initialize()
bool ReadBufferFromRemoteFSGather::nextImpl() bool ReadBufferFromRemoteFSGather::nextImpl()
{ {
assert(!internal_buffer.empty());
/// Find first available buffer that fits to given offset. /// Find first available buffer that fits to given offset.
if (!current_buf) if (!current_buf)
initialize(); initialize();
@ -210,7 +215,7 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const String ReadBufferFromRemoteFSGather::getFileName() const
{ {
return canonical_path; return current_path;
} }

View File

@ -52,12 +52,14 @@ public:
bool initialized() const { return current_buf != nullptr; } bool initialized() const { return current_buf != nullptr; }
protected: protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0; virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) = 0;
RemoteMetadata metadata; RemoteMetadata metadata;
size_t read_until_position = 0; size_t read_until_position = 0;
String current_path;
private: private:
bool nextImpl() override; bool nextImpl() override;
@ -102,7 +104,7 @@ public:
{ {
} }
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
private: private:
std::shared_ptr<Aws::S3::S3Client> client_ptr; std::shared_ptr<Aws::S3::S3Client> client_ptr;
@ -133,7 +135,7 @@ public:
{ {
} }
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
private: private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client; std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
@ -160,7 +162,7 @@ public:
{ {
} }
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
private: private:
String uri; String uri;
@ -189,7 +191,7 @@ public:
hdfs_uri = hdfs_uri_.substr(0, begin_of_path); hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
} }
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
private: private:
const Poco::Util::AbstractConfiguration & config; const Poco::Util::AbstractConfiguration & config;

View File

@ -13,7 +13,9 @@ namespace ErrorCodes
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_) : impl(std::move(impl_)) std::shared_ptr<ReadBufferFromRemoteFSGather> impl_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, impl(std::move(impl_))
{ {
} }
@ -72,6 +74,8 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
bool ReadIndirectBufferFromRemoteFS::nextImpl() bool ReadIndirectBufferFromRemoteFS::nextImpl()
{ {
assert(!internal_buffer.empty());
/// Transfer current position and working_buffer to actual ReadBuffer /// Transfer current position and working_buffer to actual ReadBuffer
swap(*impl); swap(*impl);
/// Position and working_buffer will be updated in next() call /// Position and working_buffer will be updated in next() call

View File

@ -238,7 +238,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
} }
else else
{ {
/// TODO: Pass cache for non-asynchronous reader too.
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl)); auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek); return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
} }