mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Make userspace page cache work with 'web' disks
This commit is contained in:
parent
cc1ee02459
commit
ab0c601d63
@ -21,10 +21,11 @@ namespace ErrorCodes
|
|||||||
ReadBufferFromWebServer::ReadBufferFromWebServer(
|
ReadBufferFromWebServer::ReadBufferFromWebServer(
|
||||||
const String & url_,
|
const String & url_,
|
||||||
ContextPtr context_,
|
ContextPtr context_,
|
||||||
|
size_t file_size_,
|
||||||
const ReadSettings & settings_,
|
const ReadSettings & settings_,
|
||||||
bool use_external_buffer_,
|
bool use_external_buffer_,
|
||||||
size_t read_until_position_)
|
size_t read_until_position_)
|
||||||
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
|
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
|
||||||
, log(getLogger("ReadBufferFromWebServer"))
|
, log(getLogger("ReadBufferFromWebServer"))
|
||||||
, context(context_)
|
, context(context_)
|
||||||
, url(url_)
|
, url(url_)
|
||||||
@ -36,7 +37,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
|
std::unique_ptr<SeekableReadBuffer> ReadBufferFromWebServer::initialize()
|
||||||
{
|
{
|
||||||
Poco::URI uri(url);
|
Poco::URI uri(url);
|
||||||
if (read_until_position)
|
if (read_until_position)
|
||||||
@ -119,9 +120,8 @@ bool ReadBufferFromWebServer::nextImpl()
|
|||||||
|
|
||||||
auto result = impl->next();
|
auto result = impl->next();
|
||||||
|
|
||||||
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
working_buffer = impl->buffer();
|
||||||
|
pos = impl->position();
|
||||||
chassert(working_buffer.begin() == impl->buffer().begin());
|
|
||||||
|
|
||||||
if (result)
|
if (result)
|
||||||
offset += working_buffer.size();
|
offset += working_buffer.size();
|
||||||
@ -132,16 +132,29 @@ bool ReadBufferFromWebServer::nextImpl()
|
|||||||
|
|
||||||
off_t ReadBufferFromWebServer::seek(off_t offset_, int whence)
|
off_t ReadBufferFromWebServer::seek(off_t offset_, int whence)
|
||||||
{
|
{
|
||||||
if (impl)
|
|
||||||
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");
|
|
||||||
|
|
||||||
if (whence != SEEK_SET)
|
if (whence != SEEK_SET)
|
||||||
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");
|
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");
|
||||||
|
|
||||||
if (offset_ < 0)
|
if (offset_ < 0)
|
||||||
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);
|
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);
|
||||||
|
|
||||||
offset = offset_;
|
if (impl)
|
||||||
|
{
|
||||||
|
if (use_external_buffer)
|
||||||
|
{
|
||||||
|
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
impl->seek(offset_, SEEK_SET);
|
||||||
|
|
||||||
|
working_buffer = impl->buffer();
|
||||||
|
pos = impl->position();
|
||||||
|
offset = offset_ + available();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
offset = offset_;
|
||||||
|
}
|
||||||
|
|
||||||
return offset;
|
return offset;
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ public:
|
|||||||
explicit ReadBufferFromWebServer(
|
explicit ReadBufferFromWebServer(
|
||||||
const String & url_,
|
const String & url_,
|
||||||
ContextPtr context_,
|
ContextPtr context_,
|
||||||
|
size_t file_size_,
|
||||||
const ReadSettings & settings_ = {},
|
const ReadSettings & settings_ = {},
|
||||||
bool use_external_buffer_ = false,
|
bool use_external_buffer_ = false,
|
||||||
size_t read_until_position = 0);
|
size_t read_until_position = 0);
|
||||||
@ -39,7 +40,7 @@ public:
|
|||||||
bool supportsRightBoundedReads() const override { return true; }
|
bool supportsRightBoundedReads() const override { return true; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<ReadBuffer> initialize();
|
std::unique_ptr<SeekableReadBuffer> initialize();
|
||||||
|
|
||||||
LoggerPtr log;
|
LoggerPtr log;
|
||||||
ContextPtr context;
|
ContextPtr context;
|
||||||
@ -47,7 +48,7 @@ private:
|
|||||||
const String url;
|
const String url;
|
||||||
size_t buf_size;
|
size_t buf_size;
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> impl;
|
std::unique_ptr<SeekableReadBuffer> impl;
|
||||||
|
|
||||||
ReadSettings read_settings;
|
ReadSettings read_settings;
|
||||||
|
|
||||||
|
@ -250,13 +250,15 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
|||||||
std::optional<size_t>,
|
std::optional<size_t>,
|
||||||
std::optional<size_t>) const
|
std::optional<size_t>) const
|
||||||
{
|
{
|
||||||
|
size_t object_size = object.bytes_size;
|
||||||
auto read_buffer_creator =
|
auto read_buffer_creator =
|
||||||
[this, read_settings]
|
[this, read_settings, object_size]
|
||||||
(bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr<ReadBufferFromFileBase>
|
(bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr<ReadBufferFromFileBase>
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromWebServer>(
|
return std::make_unique<ReadBufferFromWebServer>(
|
||||||
fs::path(url) / path_,
|
fs::path(url) / path_,
|
||||||
getContext(),
|
getContext(),
|
||||||
|
object_size,
|
||||||
read_settings,
|
read_settings,
|
||||||
/* use_external_buffer */true);
|
/* use_external_buffer */true);
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user