mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Lazy seek avoiding
This commit is contained in:
parent
6044725baa
commit
1f8b449bc5
@ -173,8 +173,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
||||
if (threadpool_read)
|
||||
{
|
||||
auto reader = IDiskRemote::getThreadPoolReader();
|
||||
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(web_impl));
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), min_bytes_for_seek);
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(web_impl));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -55,18 +55,28 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata
|
||||
}
|
||||
|
||||
|
||||
size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset)
|
||||
size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
|
||||
{
|
||||
/**
|
||||
* Set `data` to current working and internal buffers.
|
||||
* Internal buffer with size `size`. Working buffer with size 0.
|
||||
*/
|
||||
set(data, size);
|
||||
|
||||
absolute_position = offset;
|
||||
bytes_to_ignore = ignore;
|
||||
|
||||
auto result = nextImpl();
|
||||
bytes_to_ignore = 0;
|
||||
|
||||
if (result)
|
||||
return working_buffer.size();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::initialize()
|
||||
void ReadBufferFromRemoteFSGather::initialize()
|
||||
{
|
||||
/// One clickhouse file can be split into multiple files in remote fs.
|
||||
auto current_buf_offset = absolute_position;
|
||||
@ -77,14 +87,20 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::initialize()
|
||||
|
||||
if (size > current_buf_offset)
|
||||
{
|
||||
auto buf = createImplementationBuffer(file_path);
|
||||
buf->seek(current_buf_offset, SEEK_SET);
|
||||
return buf;
|
||||
/// Do not create a new buffer if we already have what we need.
|
||||
if (!current_buf || buf_idx != i)
|
||||
{
|
||||
current_buf = createImplementationBuffer(file_path);
|
||||
buf_idx = i;
|
||||
}
|
||||
|
||||
current_buf->seek(current_buf_offset, SEEK_SET);
|
||||
return;
|
||||
}
|
||||
|
||||
current_buf_offset -= size;
|
||||
}
|
||||
return nullptr;
|
||||
current_buf = nullptr;
|
||||
}
|
||||
|
||||
|
||||
@ -92,7 +108,7 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
||||
{
|
||||
/// Find first available buffer that fits to given offset.
|
||||
if (!current_buf)
|
||||
current_buf = initialize();
|
||||
initialize();
|
||||
|
||||
/// If current buffer has remaining data - use it.
|
||||
if (current_buf)
|
||||
@ -119,7 +135,17 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
||||
bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
{
|
||||
swap(*current_buf);
|
||||
|
||||
/**
|
||||
* Lazy seek is performed here.
|
||||
* In asynchronous buffer when seeking to offset in range [pos, pos + min_bytes_for_seek]
|
||||
* we save how many bytes need to be ignored (new_offset - position() bytes).
|
||||
*/
|
||||
if (bytes_to_ignore)
|
||||
current_buf->ignore(bytes_to_ignore);
|
||||
|
||||
auto result = current_buf->next();
|
||||
|
||||
swap(*current_buf);
|
||||
|
||||
if (result)
|
||||
@ -129,6 +155,13 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRemoteFSGather::seek(off_t offset)
|
||||
{
|
||||
absolute_position = offset;
|
||||
initialize();
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRemoteFSGather::reset()
|
||||
{
|
||||
current_buf.reset();
|
||||
|
@ -13,9 +13,7 @@ namespace Aws
|
||||
namespace S3
|
||||
{
|
||||
class S3Client;
|
||||
}
|
||||
}
|
||||
|
||||
}}
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -31,8 +29,10 @@ public:
|
||||
|
||||
void reset();
|
||||
|
||||
void seek(off_t offset); /// SEEK_SET only.
|
||||
|
||||
protected:
|
||||
size_t readInto(char * data, size_t size, size_t offset);
|
||||
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
|
||||
|
||||
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0;
|
||||
|
||||
@ -41,7 +41,7 @@ protected:
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
||||
SeekableReadBufferPtr initialize();
|
||||
void initialize();
|
||||
|
||||
bool readImpl();
|
||||
|
||||
@ -50,6 +50,10 @@ private:
|
||||
size_t current_buf_idx = 0;
|
||||
|
||||
size_t absolute_position = 0;
|
||||
|
||||
size_t buf_idx = 0;
|
||||
|
||||
size_t bytes_to_ignore = 0;
|
||||
};
|
||||
|
||||
|
||||
|
@ -31,13 +31,17 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
|
||||
AsynchronousReaderPtr reader_, Int32 priority_,
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, size_t buf_size_)
|
||||
AsynchronousReaderPtr reader_,
|
||||
Int32 priority_,
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
|
||||
size_t buf_size_,
|
||||
size_t min_bytes_for_seek_)
|
||||
: ReadBufferFromFileBase(buf_size_, nullptr, 0)
|
||||
, reader(reader_)
|
||||
, priority(priority_)
|
||||
, impl(impl_)
|
||||
, prefetch_buffer(buf_size_)
|
||||
, min_bytes_for_seek(min_bytes_for_seek_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -50,12 +54,21 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
|
||||
request.size = size;
|
||||
request.offset = absolute_position;
|
||||
request.priority = priority;
|
||||
|
||||
if (bytes_to_ignore)
|
||||
{
|
||||
request.ignore = bytes_to_ignore;
|
||||
bytes_to_ignore = 0;
|
||||
}
|
||||
return reader->submit(request);
|
||||
}
|
||||
|
||||
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
|
||||
{
|
||||
if (hasPendingData())
|
||||
return;
|
||||
|
||||
if (prefetch_future.valid())
|
||||
return;
|
||||
|
||||
@ -156,7 +169,19 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
}
|
||||
|
||||
pos = working_buffer.end();
|
||||
impl->reset();
|
||||
|
||||
if (static_cast<off_t>(absolute_position) >= getPosition()
|
||||
&& static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
|
||||
{
|
||||
/**
|
||||
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
|
||||
*/
|
||||
bytes_to_ignore = absolute_position - getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
impl->seek(absolute_position); /// SEEK_SET.
|
||||
}
|
||||
|
||||
return absolute_position;
|
||||
}
|
||||
|
@ -34,7 +34,8 @@ public:
|
||||
explicit AsynchronousReadIndirectBufferFromRemoteFS(
|
||||
AsynchronousReaderPtr reader_, Int32 priority_,
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
|
||||
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t min_bytes_for_seek = 1024 * 1024);
|
||||
|
||||
~AsynchronousReadIndirectBufferFromRemoteFS() override;
|
||||
|
||||
@ -66,6 +67,9 @@ private:
|
||||
Memory<> prefetch_buffer;
|
||||
|
||||
String buffer_events;
|
||||
|
||||
size_t min_bytes_for_seek;
|
||||
size_t bytes_to_ignore = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ public:
|
||||
size_t size = 0;
|
||||
char * buf = nullptr;
|
||||
int64_t priority = 0;
|
||||
size_t ignore = 0;
|
||||
};
|
||||
|
||||
/// Less than requested amount of data can be returned.
|
||||
|
@ -28,9 +28,9 @@ namespace CurrentMetrics
|
||||
namespace DB
|
||||
{
|
||||
|
||||
size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset)
|
||||
size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
|
||||
{
|
||||
return reader->readInto(data, size, offset);
|
||||
return reader->readInto(data, size, offset, ignore);
|
||||
}
|
||||
|
||||
|
||||
@ -49,7 +49,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
|
||||
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset);
|
||||
auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
|
||||
|
@ -30,7 +30,7 @@ struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor
|
||||
public:
|
||||
RemoteFSFileDescriptor(std::shared_ptr<ReadBufferFromRemoteFSGather> reader_) : reader(reader_) {}
|
||||
|
||||
size_t readInto(char * data, size_t size, size_t offset);
|
||||
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
|
||||
|
||||
private:
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> reader;
|
||||
|
Loading…
Reference in New Issue
Block a user