mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 21:20:49 +00:00
Experiment with checking if data is in page cache
This commit is contained in:
parent
2a0cba2b9f
commit
462c89e6f8
@ -12,6 +12,23 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
|
||||||
|
#if defined(__linux__)
|
||||||
|
|
||||||
|
#include <sys/syscall.h>
|
||||||
|
#include <sys/uio.h>
|
||||||
|
|
||||||
|
/// We don't want to depend on specific glibc version.
|
||||||
|
|
||||||
|
#if !defined(RWF_NOWAIT)
|
||||||
|
#define RWF_NOWAIT 8
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if !defined(SYS_preadv2)
|
||||||
|
#define SYS_preadv2 327 /// TODO: AArch64, PPC64
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -47,6 +64,7 @@ private:
|
|||||||
|
|
||||||
struct RequestInfo
|
struct RequestInfo
|
||||||
{
|
{
|
||||||
|
bool already_read = false;
|
||||||
Poco::Event event;
|
Poco::Event event;
|
||||||
Result result;
|
Result result;
|
||||||
};
|
};
|
||||||
@ -78,12 +96,79 @@ public:
|
|||||||
it = requests.emplace(std::piecewise_construct, std::forward_as_tuple(counter), std::forward_as_tuple()).first;
|
it = requests.emplace(std::piecewise_construct, std::forward_as_tuple(counter), std::forward_as_tuple()).first;
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.scheduleOrThrow([request, info = &it->second]
|
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
||||||
|
|
||||||
|
RequestInfo & info = it->second;
|
||||||
|
|
||||||
|
#if defined(__linux__)
|
||||||
|
/// Check if data is already in page cache with preadv2 syscall.
|
||||||
|
|
||||||
|
/// TODO ProfileEvents for page cache hits and misses.
|
||||||
|
|
||||||
|
/// We don't want to depend on new Linux kernel.
|
||||||
|
static std::atomic<bool> has_preadv2_syscall{true};
|
||||||
|
|
||||||
|
if (has_preadv2_syscall.load(std::memory_order_relaxed))
|
||||||
|
{
|
||||||
|
size_t bytes_read = 0;
|
||||||
|
while (!bytes_read)
|
||||||
|
{
|
||||||
|
struct iovec io_vec{ .iov_base = request.buf, .iov_len = request.size };
|
||||||
|
ssize_t res = syscall(
|
||||||
|
SYS_preadv2, fd,
|
||||||
|
&io_vec, 1,
|
||||||
|
static_cast<long>(request.offset), static_cast<long>(request.offset >> 32),
|
||||||
|
RWF_NOWAIT);
|
||||||
|
|
||||||
|
if (!res)
|
||||||
|
{
|
||||||
|
info.already_read = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (-1 == res)
|
||||||
|
{
|
||||||
|
if (errno == ENOSYS)
|
||||||
|
{
|
||||||
|
has_preadv2_syscall.store(false, std::memory_order_relaxed);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (errno == EAGAIN)
|
||||||
|
{
|
||||||
|
/// Data is not available.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (errno == EINTR)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
info.already_read = true;
|
||||||
|
info.result.exception = std::make_exception_ptr(ErrnoException(
|
||||||
|
fmt::format("Cannot read from file {}, {}", fd,
|
||||||
|
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
|
||||||
|
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bytes_read += res;
|
||||||
|
info.already_read = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info.result.size = bytes_read;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (!info.already_read)
|
||||||
|
{
|
||||||
|
pool.scheduleOrThrow([request, fd, &info]
|
||||||
{
|
{
|
||||||
setThreadName("ThreadPoolRead");
|
setThreadName("ThreadPoolRead");
|
||||||
|
|
||||||
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
|
||||||
|
|
||||||
/// TODO Instrumentation.
|
/// TODO Instrumentation.
|
||||||
|
|
||||||
size_t bytes_read = 0;
|
size_t bytes_read = 0;
|
||||||
@ -95,21 +180,21 @@ public:
|
|||||||
|
|
||||||
if (-1 == res && errno != EINTR)
|
if (-1 == res && errno != EINTR)
|
||||||
{
|
{
|
||||||
info->result.exception = std::make_exception_ptr(ErrnoException(
|
info.result.exception = std::make_exception_ptr(ErrnoException(
|
||||||
fmt::format("Cannot read from file {}, {}", fd,
|
fmt::format("Cannot read from file {}, {}", fd,
|
||||||
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
|
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
|
||||||
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno));
|
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res > 0)
|
|
||||||
bytes_read += res;
|
bytes_read += res;
|
||||||
}
|
}
|
||||||
|
|
||||||
info->result.size = bytes_read;
|
info.result.size = bytes_read;
|
||||||
info->event.set();
|
info.event.set();
|
||||||
},
|
},
|
||||||
request.priority);
|
request.priority);
|
||||||
|
}
|
||||||
|
|
||||||
return it->first;
|
return it->first;
|
||||||
}
|
}
|
||||||
@ -126,6 +211,8 @@ public:
|
|||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id);
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!it->second.already_read)
|
||||||
|
{
|
||||||
if (microseconds)
|
if (microseconds)
|
||||||
{
|
{
|
||||||
if (!it->second.event.tryWait(*microseconds / 1000))
|
if (!it->second.event.tryWait(*microseconds / 1000))
|
||||||
@ -133,6 +220,7 @@ public:
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
it->second.event.wait();
|
it->second.event.wait();
|
||||||
|
}
|
||||||
|
|
||||||
Result res = it->second.result;
|
Result res = it->second.result;
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
|
|
||||||
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
|
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
|
||||||
//static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
|
//static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
|
||||||
static AsynchronousReaderPtr direct_reader = std::make_shared<AIOReader>(128, 1024);
|
//static AsynchronousReaderPtr direct_reader = std::make_shared<AIOReader>(128, 1024);
|
||||||
|
|
||||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
||||||
if (direct_io_threshold && estimated_size >= direct_io_threshold)
|
if (direct_io_threshold && estimated_size >= direct_io_threshold)
|
||||||
@ -84,7 +84,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto res = std::make_unique<AsynchronousReadBufferFromFileWithCache>(
|
auto res = std::make_unique<AsynchronousReadBufferFromFileWithCache>(
|
||||||
direct_reader,
|
reader,
|
||||||
filename, buffer_size, (flags == -1 ? O_RDONLY | O_CLOEXEC : flags) | O_DIRECT, existing_memory, alignment);
|
filename, buffer_size, (flags == -1 ? O_RDONLY | O_CLOEXEC : flags) | O_DIRECT, existing_memory, alignment);
|
||||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIO);
|
ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIO);
|
||||||
return res;
|
return res;
|
||||||
|
Loading…
Reference in New Issue
Block a user