Improve performance

This commit is contained in:
Alexey Milovidov 2021-08-04 03:07:04 +03:00
parent 425bf5d301
commit 91173b8934
7 changed files with 81 additions and 200 deletions

View File

@ -26,7 +26,7 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
void AsynchronousReadBufferFromFileDescriptor::prefetch() void AsynchronousReadBufferFromFileDescriptor::prefetch()
{ {
if (prefetch_request_id) if (prefetch_future.valid())
return; return;
/// Will request the same amount of data that is read in nextImpl. /// Will request the same amount of data that is read in nextImpl.
@ -38,28 +38,25 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch()
request.size = prefetch_buffer.size(); request.size = prefetch_buffer.size();
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
prefetch_request_id = reader->submit(request); prefetch_future = reader->submit(request);
} }
bool AsynchronousReadBufferFromFileDescriptor::nextImpl() bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{ {
if (!prefetch_request_id) if (!prefetch_future.valid())
prefetch(); prefetch();
auto response = reader->wait(*prefetch_request_id, {}); auto size = prefetch_future.get();
prefetch_request_id.reset(); prefetch_future = {};
if (response->exception) file_offset_of_buffer_end += size;
std::rethrow_exception(response->exception);
file_offset_of_buffer_end += response->size; if (size)
if (response->size)
{ {
prefetch_buffer.swap(memory); prefetch_buffer.swap(memory);
set(memory.data(), memory.size()); set(memory.data(), memory.size());
working_buffer.resize(response->size); working_buffer.resize(size);
return true; return true;
} }
@ -69,10 +66,10 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
void AsynchronousReadBufferFromFileDescriptor::finalize() void AsynchronousReadBufferFromFileDescriptor::finalize()
{ {
if (prefetch_request_id) if (prefetch_future.valid())
{ {
reader->wait(*prefetch_request_id, {}); prefetch_future.wait();
prefetch_request_id.reset(); prefetch_future = {};
} }
} }
@ -105,26 +102,25 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos; return new_pos;
/// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos,
/// so the second inequality is strict.
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos) if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end) && new_pos <= file_offset_of_buffer_end)
{ {
/// Position is still inside the buffer. /// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin()); assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end()); assert(pos <= working_buffer.end());
return new_pos; return new_pos;
} }
else else
{ {
if (prefetch_request_id) if (prefetch_future.valid())
{ {
std::cerr << "Ignoring prefetched data" << "\n"; std::cerr << "Ignoring prefetched data" << "\n";
reader->wait(*prefetch_request_id, {}); prefetch_future.wait();
prefetch_request_id.reset(); prefetch_future = {};
} }
/// Position is out of the buffer, we need to do real seek. /// Position is out of the buffer, we need to do real seek.
@ -151,10 +147,10 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
void AsynchronousReadBufferFromFileDescriptor::rewind() void AsynchronousReadBufferFromFileDescriptor::rewind()
{ {
if (prefetch_request_id) if (prefetch_future.valid())
{ {
reader->wait(*prefetch_request_id, {}); prefetch_future.wait();
prefetch_request_id.reset(); prefetch_future = {};
} }
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'. /// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.

View File

@ -19,7 +19,7 @@ protected:
AsynchronousReaderPtr reader; AsynchronousReaderPtr reader;
Memory<> prefetch_buffer; Memory<> prefetch_buffer;
std::optional<IAsynchronousReader::RequestID> prefetch_request_id; std::future<IAsynchronousReader::Result> prefetch_future;
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned. const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().

View File

@ -3,6 +3,7 @@
#include <Core/Types.h> #include <Core/Types.h>
#include <optional> #include <optional>
#include <memory> #include <memory>
#include <future>
namespace DB namespace DB
@ -20,8 +21,6 @@ namespace DB
class IAsynchronousReader class IAsynchronousReader
{ {
public: public:
using RequestID = UInt64;
/// For local filesystems, the file descriptor is simply integer /// For local filesystems, the file descriptor is simply integer
/// but it can be arbitrary opaque object for remote filesystems. /// but it can be arbitrary opaque object for remote filesystems.
struct IFileDescriptor struct IFileDescriptor
@ -50,30 +49,15 @@ public:
}; };
/// Less than requested amount of data can be returned. /// Less than requested amount of data can be returned.
/// Also error can be returned in 'exception'. /// If size is zero - the file has ended.
/// If no error, and the size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically) /// (for example, EINTR must be handled by implementation automatically)
struct Result using Result = size_t;
{
size_t size = 0;
std::exception_ptr exception;
};
/// The methods 'submit' and 'wait' both can be called concurrently from multiple threads
/// but only for different requests.
/// Submit request and obtain a handle. This method don't perform any waits. /// Submit request and obtain a handle. This method don't perform any waits.
/// If this method did not throw, the caller must wait for the result with 'wait' method /// If this method did not throw, the caller must wait for the result with 'wait' method
/// or destroy the whole reader before destroying the buffer for request. /// or destroy the whole reader before destroying the buffer for request.
virtual RequestID submit(Request request) = 0; /// The method can be called concurrently from multiple threads.
virtual std::future<Result> submit(Request request) = 0;
/// Wait for request completion or timeout.
/// Optional timeout can be specified, otherwise waits until completion.
/// In case of timeout, nullopt is returned.
/// In case of completion, Result object is returned. Result may contain exception.
/// In case of timeout, the caller must call wait again until completion
/// or destroy the whole reader before destroying the buffer for request.
virtual std::optional<Result> wait(RequestID id, std::optional<UInt64> microseconds) = 0;
/// Destructor must wait for all not completed request and ignore the results. /// Destructor must wait for all not completed request and ignore the results.
/// It may also cancel the requests. /// It may also cancel the requests.

View File

@ -143,16 +143,15 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos; return new_pos;
/// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos,
/// so the second inequality is strict.
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos) if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end) && new_pos <= file_offset_of_buffer_end)
{ {
/// Position is still inside the buffer. /// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin()); assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end()); assert(pos <= working_buffer.end());
return new_pos; return new_pos;
} }

View File

@ -25,13 +25,8 @@ namespace ErrorCodes
*/ */
class SynchronousReader final : public IAsynchronousReader class SynchronousReader final : public IAsynchronousReader
{ {
private:
UInt64 counter = 0;
std::unordered_map<UInt64, Request> requests;
std::mutex mutex;
public: public:
RequestID submit(Request request) override std::future<Result> submit(Request request) override
{ {
#if defined(POSIX_FADV_WILLNEED) #if defined(POSIX_FADV_WILLNEED)
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd; int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
@ -39,30 +34,8 @@ public:
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE); throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif #endif
std::lock_guard lock(mutex); return std::async(std::launch::deferred, [fd, request]
++counter;
requests.emplace(counter, request);
return counter;
}
/// Timeout is not implemented.
std::optional<Result> wait(RequestID id, std::optional<UInt64>) override
{ {
Request request;
Result result;
{
std::lock_guard lock(mutex);
auto it = requests.find(id);
if (it == requests.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id);
request = it->second;
requests.erase(it);
}
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
/// TODO Instrumentation. /// TODO Instrumentation.
size_t bytes_read = 0; size_t bytes_read = 0;
@ -73,20 +46,14 @@ public:
break; break;
if (-1 == res && errno != EINTR) if (-1 == res && errno != EINTR)
{ throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
result.exception = std::make_exception_ptr(ErrnoException(
fmt::format("Cannot read from file {}, {}", fd,
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno));
return result;
}
if (res > 0) if (res > 0)
bytes_read += res; bytes_read += res;
} }
result.size = bytes_read; return bytes_read;
return result; });
} }
~SynchronousReader() override ~SynchronousReader() override

View File

@ -7,8 +7,7 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <common/errnoToString.h> #include <common/errnoToString.h>
#include <Poco/Event.h> #include <Poco/Event.h>
#include <unordered_map> #include <future>
#include <mutex>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
@ -60,46 +59,18 @@ namespace ErrorCodes
class ThreadPoolReader final : public IAsynchronousReader class ThreadPoolReader final : public IAsynchronousReader
{ {
private: private:
UInt64 counter = 0;
struct RequestInfo
{
bool already_read = false;
Poco::Event event;
Result result;
};
using Requests = std::unordered_map<UInt64, RequestInfo>;
Requests requests;
std::mutex mutex;
ThreadPool pool; ThreadPool pool;
size_t queue_size;
public: public:
ThreadPoolReader(size_t pool_size, size_t queue_size_) ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(pool_size, pool_size, queue_size_), queue_size(queue_size_) : pool(pool_size, pool_size, queue_size_)
{ {
} }
RequestID submit(Request request) override std::future<Result> submit(Request request) override
{ {
Requests::iterator it;
{
std::lock_guard lock(mutex);
if (requests.size() >= queue_size)
throw Exception("Too many read requests in flight", ErrorCodes::CANNOT_SCHEDULE_TASK);
++counter;
it = requests.emplace(std::piecewise_construct, std::forward_as_tuple(counter), std::forward_as_tuple()).first;
}
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd; int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
RequestInfo & info = it->second;
#if defined(__linux__) #if defined(__linux__)
/// Check if data is already in page cache with preadv2 syscall. /// Check if data is already in page cache with preadv2 syscall.
@ -110,6 +81,9 @@ public:
if (has_pread_nowait_support.load(std::memory_order_relaxed)) if (has_pread_nowait_support.load(std::memory_order_relaxed))
{ {
std::promise<Result> promise;
std::future<Result> future = promise.get_future();
size_t bytes_read = 0; size_t bytes_read = 0;
while (!bytes_read) while (!bytes_read)
{ {
@ -120,10 +94,12 @@ public:
static_cast<long>(request.offset), static_cast<long>(request.offset >> 32), static_cast<long>(request.offset), static_cast<long>(request.offset >> 32),
RWF_NOWAIT); RWF_NOWAIT);
//ssize_t res = ::pread(fd, request.buf, request.size, request.offset);
if (!res) if (!res)
{ {
info.already_read = true; promise.set_value(0);
break; return future;
} }
if (-1 == res) if (-1 == res)
@ -144,28 +120,28 @@ public:
} }
else else
{ {
info.already_read = true; promise.set_exception(std::make_exception_ptr(ErrnoException(
info.result.exception = std::make_exception_ptr(ErrnoException(
fmt::format("Cannot read from file {}, {}", fd, fmt::format("Cannot read from file {}, {}", fd,
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)), errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)); ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)));
break; return future;
} }
} }
else else
{ {
bytes_read += res; bytes_read += res;
info.already_read = true;
} }
} }
info.result.size = bytes_read; if (bytes_read)
{
promise.set_value(bytes_read);
return future;
}
} }
#endif #endif
if (!info.already_read) auto task = std::make_shared<std::packaged_task<Result()>>([request, fd]
{
pool.scheduleOrThrow([request, fd, &info]
{ {
setThreadName("ThreadPoolRead"); setThreadName("ThreadPoolRead");
@ -179,57 +155,17 @@ public:
break; break;
if (-1 == res && errno != EINTR) if (-1 == res && errno != EINTR)
{ throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
info.result.exception = std::make_exception_ptr(ErrnoException(
fmt::format("Cannot read from file {}, {}", fd,
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno));
break;
}
bytes_read += res; bytes_read += res;
} }
info.result.size = bytes_read; return bytes_read;
info.event.set(); });
},
request.priority);
}
return it->first; auto future = task->get_future();
} pool.scheduleOrThrow([task]{ (*task)(); }, request.priority);
return future;
std::optional<Result> wait(RequestID id, std::optional<UInt64> microseconds) override
{
Result result;
Requests::iterator it;
{
std::lock_guard lock(mutex);
it = requests.find(id);
if (it == requests.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id);
}
if (!it->second.already_read)
{
if (microseconds)
{
if (!it->second.event.tryWait(*microseconds / 1000))
return {};
}
else
it->second.event.wait();
}
Result res = it->second.result;
{
std::lock_guard lock(mutex);
requests.erase(it);
}
return res;
} }
~ThreadPoolReader() override ~ThreadPoolReader() override

View File

@ -4,7 +4,6 @@
#include <IO/AsynchronousReadBufferFromFile.h> #include <IO/AsynchronousReadBufferFromFile.h>
#include <IO/ThreadPoolReader.h> #include <IO/ThreadPoolReader.h>
#include <IO/SynchronousReader.h> #include <IO/SynchronousReader.h>
#include <IO/AIOReader.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>