mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Improve performance in case without prefetch by avoiding swapping of memory buffers
This commit is contained in:
parent
384b4304c0
commit
5243315227
@ -24,6 +24,19 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
|
||||
}
|
||||
|
||||
|
||||
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::readInto(char * data, size_t size)
|
||||
{
|
||||
IAsynchronousReader::Request request;
|
||||
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
|
||||
request.buf = data;
|
||||
request.size = size;
|
||||
request.offset = file_offset_of_buffer_end;
|
||||
request.priority = priority;
|
||||
|
||||
return reader->submit(request);
|
||||
}
|
||||
|
||||
|
||||
void AsynchronousReadBufferFromFileDescriptor::prefetch()
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
@ -31,37 +44,46 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch()
|
||||
|
||||
/// Will request the same amount of data that is read in nextImpl.
|
||||
prefetch_buffer.resize(internal_buffer.size());
|
||||
|
||||
IAsynchronousReader::Request request;
|
||||
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
|
||||
request.buf = prefetch_buffer.data();
|
||||
request.size = prefetch_buffer.size();
|
||||
request.offset = file_offset_of_buffer_end;
|
||||
request.priority = priority;
|
||||
|
||||
prefetch_future = reader->submit(request);
|
||||
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
|
||||
}
|
||||
|
||||
|
||||
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
{
|
||||
if (!prefetch_future.valid())
|
||||
prefetch();
|
||||
|
||||
auto size = prefetch_future.get();
|
||||
prefetch_future = {};
|
||||
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
if (size)
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
prefetch_buffer.swap(memory);
|
||||
set(memory.data(), memory.size());
|
||||
working_buffer.resize(size);
|
||||
return true;
|
||||
}
|
||||
/// Read request already in flight. Wait for its completion.
|
||||
|
||||
return false;
|
||||
auto size = prefetch_future.get();
|
||||
prefetch_future = {};
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
if (size)
|
||||
{
|
||||
prefetch_buffer.swap(memory);
|
||||
set(memory.data(), memory.size());
|
||||
working_buffer.resize(size);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// No pending request. Do synchronous read.
|
||||
|
||||
auto size = readInto(memory.data(), memory.size()).get();
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
if (size)
|
||||
{
|
||||
set(memory.data(), memory.size());
|
||||
working_buffer.resize(size);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -38,7 +38,7 @@ public:
|
||||
AsynchronousReaderPtr reader_, Int32 priority_,
|
||||
int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
|
||||
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
|
||||
reader(std::move(reader_)), priority(priority_), prefetch_buffer(buf_size, alignment), required_alignment(alignment), fd(fd_)
|
||||
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -61,6 +61,9 @@ public:
|
||||
|
||||
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
||||
void rewind();
|
||||
|
||||
private:
|
||||
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ public:
|
||||
else if (errno == EAGAIN)
|
||||
{
|
||||
/// Data is not available.
|
||||
std::cerr << "miss\n";
|
||||
//std::cerr << "miss\n";
|
||||
break;
|
||||
}
|
||||
else if (errno == EINTR)
|
||||
@ -134,7 +134,7 @@ public:
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
std::cerr << "hit\n";
|
||||
//std::cerr << "hit\n";
|
||||
promise.set_value(bytes_read);
|
||||
return future;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user