This commit is contained in:
kssenii 2021-09-24 13:38:08 +03:00
parent 1b9565dfa8
commit c1ea44b9c2
3 changed files with 22 additions and 17 deletions

View File

@ -36,6 +36,13 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
{
IAsynchronousReader::Request request;
impl->set(impl->buffer().begin(), impl->buffer().size());
if (impl->initialized())
{
impl->position() = impl->buffer().end();
assert(!impl->hasPendingData());
}
auto remote_fd = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>();
remote_fd->impl = impl;
@ -51,11 +58,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (prefetch_future.valid())
return;
if (impl->initialized())
{
impl->position() = impl->buffer().end(); /// May be should try to do this differently.
assert(!impl->hasPendingData());
}
prefetch_future = readNext();
}
@ -74,17 +76,19 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
else
{
if (impl->initialized())
{
impl->position() = position();
assert(!impl->hasPendingData());
}
// if (impl->initialized())
// {
// impl->position() = position();
// assert(!impl->hasPendingData());
// }
size = readNext().get();
}
if (size)
{
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
//set(working_buffer.begin(), working_buffer.size());
swap(*impl);
// BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
impl->absolute_position += working_buffer.size();
}
@ -138,8 +142,9 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
prefetch_future = {};
}
impl->seek(impl->absolute_position, SEEK_SET);
// impl->seek(impl->absolute_position, SEEK_SET);
pos = working_buffer.end();
impl->reset();
return impl->absolute_position;
}

View File

@ -85,17 +85,16 @@ off_t ReadBufferFromRemoteFS::seek([[maybe_unused]] off_t offset_, int whence)
{
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET is allowed");
/// We already made a seek and adjusted position in ReadIndirectBufferFromRemoteFS.
assert(offset_ == static_cast<off_t>(absolute_position));
current_buf = initialize();
// current_buf = initialize();
return absolute_position;
}
void ReadBufferFromRemoteFS::reset()
{
set(nullptr, 0);
current_buf.reset();
// set(nullptr, 0);
}
}

View File

@ -48,7 +48,8 @@ bool ReadBufferFromS3::nextImpl()
if (impl)
{
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
impl->position() = position();
// impl->position() = position();
impl->set(working_buffer.begin(), working_buffer.size());
assert(!impl->hasPendingData());
}
else