Fix threadpool reads

This commit is contained in:
kssenii 2022-01-22 01:07:20 +03:00
parent 1858f69f5a
commit 9f295ebf92

View File

@ -1,5 +1,7 @@
#include "CacheableReadBufferFromRemoteFS.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <filesystem>
namespace ProfileEvents
{
@ -24,12 +26,13 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS(
const ReadSettings & settings_,
size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
, log(&Poco::Logger::get("CacheableReadBufferFromRemoteFS" + path_ + ""))
, log(&Poco::Logger::get("CacheableReadBufferFromRemoteFS(" + path_ + ")"))
, key(cache_->hash(path_))
, cache(cache_)
, reader(reader_)
, settings(settings_)
, read_until_position(read_until_position_)
, path(path_)
{
}
@ -40,10 +43,6 @@ void CacheableReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
* DOWNLOADING means that either the segment is being downloaded by some other thread or that it
* is going to be downloaded by the caller (just space reservation happened).
* EMPTY means that the segment not in cache, not being downloaded and cannot be downloaded
* by the caller (because of not enough space or max elements limit reached). E.g. returned list is never empty.
*/
if (file_segments_holder->file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of file segments cannot be empty");
@ -56,7 +55,7 @@ void CacheableReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createCacheReadBuffer(size_t offset) const
{
return createReadBufferFromFileBase(cache->path(key, offset), settings);
return std::make_shared<ReadBufferFromFile>(cache->path(key, offset), settings.local_fs_buffer_size);
}
SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegmentPtr file_segment)
@ -129,7 +128,6 @@ SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegm
/// TODO: Add seek avoiding for s3 on the lowest level.
implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right]
implementation_buffer->seek(range.left, SEEK_SET);
return implementation_buffer;
}
@ -156,6 +154,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false;
bool new_impl = false;
if (impl)
{
auto current_read_range = (*current_file_segment_it)->range();
@ -169,11 +169,13 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
return false;
impl = createReadBuffer(*current_file_segment_it);
new_impl = true;
}
}
else
{
impl = createReadBuffer(*current_file_segment_it);
new_impl = true;
}
auto current_read_range = (*current_file_segment_it)->range();
@ -184,6 +186,12 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
swap(*impl);
if (new_impl)
{
LOG_TEST(log, "SEEK TO {}", file_offset_of_buffer_end);
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}
bool result;
auto & file_segment = *current_file_segment_it;
@ -251,6 +259,7 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
LOG_TEST(log, "Returning with {} bytes, last range: {}, current offset: {}",
working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end);
return result;
}