2021-07-26 00:34:36 +00:00
|
|
|
#include <errno.h>
|
|
|
|
#include <time.h>
|
|
|
|
#include <optional>
|
|
|
|
#include <Common/ProfileEvents.h>
|
|
|
|
#include <Common/Stopwatch.h>
|
|
|
|
#include <Common/Exception.h>
|
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
|
|
|
|
|
2021-08-27 00:08:10 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event AsynchronousReadWaitMicroseconds;
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
|
|
|
extern const Metric AsynchronousReadWait;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-07-28 05:28:30 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
2022-02-21 14:42:43 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-07-28 05:28:30 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
|
|
|
|
std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
|
|
|
|
{
|
|
|
|
return "(fd = " + toString(fd) + ")";
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::readInto(char * data, size_t size)
|
|
|
|
{
|
|
|
|
IAsynchronousReader::Request request;
|
|
|
|
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
|
|
|
|
request.buf = data;
|
|
|
|
request.size = size;
|
|
|
|
request.offset = file_offset_of_buffer_end;
|
|
|
|
request.priority = priority;
|
2022-02-16 04:26:51 +00:00
|
|
|
request.ignore = bytes_to_ignore;
|
|
|
|
bytes_to_ignore = 0;
|
2021-08-25 00:13:05 +00:00
|
|
|
|
2022-01-03 16:21:50 +00:00
|
|
|
/// This is a workaround of a read pass EOF bug in linux kernel with pread()
|
|
|
|
if (file_size.has_value() && file_offset_of_buffer_end >= *file_size)
|
|
|
|
{
|
|
|
|
return std::async(std::launch::deferred, []
|
|
|
|
{
|
|
|
|
return IAsynchronousReader::Result{ .size = 0, .offset = 0 };
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
return reader->submit(request);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
void AsynchronousReadBufferFromFileDescriptor::prefetch()
|
|
|
|
{
|
2021-08-04 00:07:04 +00:00
|
|
|
if (prefetch_future.valid())
|
2021-07-26 00:34:36 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
/// Will request the same amount of data that is read in nextImpl.
|
|
|
|
prefetch_buffer.resize(internal_buffer.size());
|
2021-08-25 00:13:05 +00:00
|
|
|
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
|
|
|
{
|
2021-08-25 00:13:05 +00:00
|
|
|
if (prefetch_future.valid())
|
|
|
|
{
|
|
|
|
/// Read request already in flight. Wait for its completion.
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2021-12-17 09:00:22 +00:00
|
|
|
size_t size = 0;
|
2022-02-16 04:26:51 +00:00
|
|
|
size_t offset = 0;
|
2021-08-27 00:08:10 +00:00
|
|
|
{
|
|
|
|
Stopwatch watch;
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
2021-12-17 09:00:22 +00:00
|
|
|
auto result = prefetch_future.get();
|
|
|
|
size = result.size;
|
2022-02-16 04:26:51 +00:00
|
|
|
offset = result.offset;
|
|
|
|
assert(offset < size || size == 0);
|
2021-08-27 00:08:10 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
|
|
|
|
}
|
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
prefetch_future = {};
|
|
|
|
file_offset_of_buffer_end += size;
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
if (size)
|
|
|
|
{
|
|
|
|
prefetch_buffer.swap(memory);
|
2022-02-16 04:26:51 +00:00
|
|
|
/// Adjust the working buffer so that it ignores `offset` bytes.
|
|
|
|
setWithBytesToIgnore(memory.data(), size, offset);
|
2021-08-25 00:13:05 +00:00
|
|
|
return true;
|
|
|
|
}
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
return false;
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
2021-08-25 00:13:05 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
/// No pending request. Do synchronous read.
|
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
auto [size, offset] = readInto(memory.data(), memory.size()).get();
|
2021-08-25 00:13:05 +00:00
|
|
|
file_offset_of_buffer_end += size;
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2021-08-25 00:13:05 +00:00
|
|
|
if (size)
|
|
|
|
{
|
2022-02-16 04:26:51 +00:00
|
|
|
/// Adjust the working buffer so that it ignores `offset` bytes.
|
|
|
|
setWithBytesToIgnore(memory.data(), size, offset);
|
2021-08-25 00:13:05 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-27 23:47:28 +00:00
|
|
|
void AsynchronousReadBufferFromFileDescriptor::finalize()
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2021-08-04 00:07:04 +00:00
|
|
|
if (prefetch_future.valid())
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2021-08-04 00:07:04 +00:00
|
|
|
prefetch_future.wait();
|
|
|
|
prefetch_future = {};
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-02-21 14:42:43 +00:00
|
|
|
AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescriptor(
|
|
|
|
AsynchronousReaderPtr reader_,
|
|
|
|
Int32 priority_,
|
|
|
|
int fd_,
|
|
|
|
size_t buf_size,
|
|
|
|
char * existing_memory,
|
|
|
|
size_t alignment,
|
|
|
|
std::optional<size_t> file_size_)
|
|
|
|
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
|
|
|
|
, reader(std::move(reader_))
|
|
|
|
, priority(priority_)
|
|
|
|
, required_alignment(alignment)
|
|
|
|
, fd(fd_)
|
|
|
|
{
|
|
|
|
if (required_alignment > buf_size)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Too large alignment. Cannot have required_alignment greater than buf_size: {} > {}. It is a bug",
|
|
|
|
required_alignment,
|
|
|
|
buf_size);
|
|
|
|
|
|
|
|
prefetch_buffer.alignment = alignment;
|
|
|
|
}
|
|
|
|
|
2021-07-27 23:47:28 +00:00
|
|
|
AsynchronousReadBufferFromFileDescriptor::~AsynchronousReadBufferFromFileDescriptor()
|
|
|
|
{
|
|
|
|
finalize();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
|
|
|
|
off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
|
|
|
{
|
|
|
|
size_t new_pos;
|
|
|
|
if (whence == SEEK_SET)
|
|
|
|
{
|
|
|
|
assert(offset >= 0);
|
|
|
|
new_pos = offset;
|
|
|
|
}
|
|
|
|
else if (whence == SEEK_CUR)
|
|
|
|
{
|
|
|
|
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Position is unchanged.
|
|
|
|
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
|
|
|
|
return new_pos;
|
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
while (true)
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2022-02-16 04:26:51 +00:00
|
|
|
if (file_offset_of_buffer_end - working_buffer.size() <= new_pos && new_pos <= file_offset_of_buffer_end)
|
|
|
|
{
|
|
|
|
/// Position is still inside the buffer.
|
|
|
|
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
|
|
|
|
assert(pos >= working_buffer.begin());
|
|
|
|
assert(pos <= working_buffer.end());
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
return new_pos;
|
|
|
|
}
|
|
|
|
else if (prefetch_future.valid())
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2022-02-16 04:26:51 +00:00
|
|
|
/// Read from prefetch buffer and recheck if the new position is valid inside.
|
|
|
|
|
|
|
|
if (nextImpl())
|
|
|
|
continue;
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
break;
|
|
|
|
}
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
assert(!prefetch_future.valid());
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
/// Position is out of the buffer, we need to do real seek.
|
|
|
|
off_t seek_pos = required_alignment > 1
|
|
|
|
? new_pos / required_alignment * required_alignment
|
|
|
|
: new_pos;
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
/// First reset the buffer so the next read will fetch new data to the buffer.
|
|
|
|
resetWorkingBuffer();
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
/// Just update the info about the next position in file.
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
file_offset_of_buffer_end = seek_pos;
|
|
|
|
bytes_to_ignore = new_pos - seek_pos;
|
2021-07-26 00:34:36 +00:00
|
|
|
|
2022-02-21 14:42:43 +00:00
|
|
|
assert(bytes_to_ignore < internal_buffer.size());
|
|
|
|
|
2022-02-16 04:26:51 +00:00
|
|
|
return seek_pos;
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void AsynchronousReadBufferFromFileDescriptor::rewind()
|
|
|
|
{
|
2021-08-04 00:07:04 +00:00
|
|
|
if (prefetch_future.valid())
|
2021-07-26 01:51:12 +00:00
|
|
|
{
|
2021-08-04 00:07:04 +00:00
|
|
|
prefetch_future.wait();
|
|
|
|
prefetch_future = {};
|
2021-07-26 01:51:12 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.
|
|
|
|
working_buffer.resize(0);
|
|
|
|
pos = working_buffer.begin();
|
|
|
|
file_offset_of_buffer_end = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|