mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-25 11:10:49 +00:00
Fix bad conflict resolving
This commit is contained in:
parent
bba67a8b87
commit
c0ae93107f
@ -1,312 +0,0 @@
|
|||||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
|
||||||
|
|
||||||
#include <IO/ReadBufferAIO.h>
|
|
||||||
#include <IO/AIOContextPool.h>
|
|
||||||
#include <Common/ProfileEvents.h>
|
|
||||||
#include <Common/Stopwatch.h>
|
|
||||||
#include <Common/MemorySanitizer.h>
|
|
||||||
#include <Core/Defines.h>
|
|
||||||
|
|
||||||
#include <sys/types.h>
|
|
||||||
#include <sys/stat.h>
|
|
||||||
#include <errno.h>
|
|
||||||
|
|
||||||
#include <optional>
|
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
|
||||||
{
|
|
||||||
extern const Event FileOpen;
|
|
||||||
extern const Event ReadBufferAIORead;
|
|
||||||
extern const Event ReadBufferAIOReadBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace CurrentMetrics
|
|
||||||
{
|
|
||||||
extern const Metric Read;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int FILE_DOESNT_EXIST;
|
|
||||||
extern const int CANNOT_OPEN_FILE;
|
|
||||||
extern const int LOGICAL_ERROR;
|
|
||||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
|
||||||
extern const int AIO_READ_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Note: an additional page is allocated that will contain the data that
|
|
||||||
/// does not fit into the main buffer.
|
|
||||||
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_)
|
|
||||||
: ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
|
|
||||||
fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
|
|
||||||
filename(filename_)
|
|
||||||
{
|
|
||||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
|
||||||
|
|
||||||
int open_flags = (flags_ == -1) ? O_RDONLY : flags_;
|
|
||||||
open_flags |= O_DIRECT;
|
|
||||||
open_flags |= O_CLOEXEC;
|
|
||||||
|
|
||||||
fd = ::open(filename.c_str(), open_flags);
|
|
||||||
if (fd == -1)
|
|
||||||
{
|
|
||||||
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
|
|
||||||
throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ReadBufferAIO::~ReadBufferAIO()
|
|
||||||
{
|
|
||||||
if (!aio_failed)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
(void) waitForAIOCompletion();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fd != -1)
|
|
||||||
::close(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
|
|
||||||
{
|
|
||||||
if (is_started)
|
|
||||||
throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR);
|
|
||||||
max_bytes_read = max_bytes_read_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ReadBufferAIO::nextImpl()
|
|
||||||
{
|
|
||||||
/// If the end of the file has already been reached by calling this function,
|
|
||||||
/// then the current call is wrong.
|
|
||||||
if (is_eof)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
std::optional<Stopwatch> watch;
|
|
||||||
if (profile_callback)
|
|
||||||
watch.emplace(clock_type);
|
|
||||||
|
|
||||||
if (!is_pending_read)
|
|
||||||
synchronousRead();
|
|
||||||
else
|
|
||||||
receive();
|
|
||||||
|
|
||||||
if (profile_callback)
|
|
||||||
{
|
|
||||||
ProfileInfo info;
|
|
||||||
info.bytes_requested = requested_byte_count;
|
|
||||||
info.bytes_read = bytes_read;
|
|
||||||
info.nanoseconds = watch->elapsed(); //-V1007
|
|
||||||
profile_callback(info);
|
|
||||||
}
|
|
||||||
|
|
||||||
is_started = true;
|
|
||||||
|
|
||||||
/// If the end of the file is just reached, do nothing else.
|
|
||||||
if (is_eof)
|
|
||||||
return bytes_read != 0;
|
|
||||||
|
|
||||||
/// Create an asynchronous request.
|
|
||||||
prepare();
|
|
||||||
|
|
||||||
#if defined(__FreeBSD__)
|
|
||||||
request.aio.aio_lio_opcode = LIO_READ;
|
|
||||||
request.aio.aio_fildes = fd;
|
|
||||||
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
|
|
||||||
request.aio.aio_nbytes = region_aligned_size;
|
|
||||||
request.aio.aio_offset = region_aligned_begin;
|
|
||||||
#else
|
|
||||||
request.aio_lio_opcode = IOCB_CMD_PREAD;
|
|
||||||
request.aio_fildes = fd;
|
|
||||||
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
|
|
||||||
request.aio_nbytes = region_aligned_size;
|
|
||||||
request.aio_offset = region_aligned_begin;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/// Send the request.
|
|
||||||
try
|
|
||||||
{
|
|
||||||
future_bytes_read = AIOContextPool::instance().post(request);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
aio_failed = true;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
is_pending_read = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
off_t ReadBufferAIO::seek(off_t off, int whence)
|
|
||||||
{
|
|
||||||
off_t new_pos_in_file;
|
|
||||||
|
|
||||||
if (whence == SEEK_SET)
|
|
||||||
{
|
|
||||||
if (off < 0)
|
|
||||||
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
new_pos_in_file = off;
|
|
||||||
}
|
|
||||||
else if (whence == SEEK_CUR)
|
|
||||||
{
|
|
||||||
if (off >= 0)
|
|
||||||
{
|
|
||||||
if (off > (std::numeric_limits<off_t>::max() - getPosition()))
|
|
||||||
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
}
|
|
||||||
else if (off < -getPosition())
|
|
||||||
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
new_pos_in_file = getPosition() + off;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
|
|
||||||
if (new_pos_in_file != getPosition())
|
|
||||||
{
|
|
||||||
off_t first_read_pos_in_file = first_unread_pos_in_file - static_cast<off_t>(working_buffer.size());
|
|
||||||
if (hasPendingData() && (new_pos_in_file >= first_read_pos_in_file) && (new_pos_in_file <= first_unread_pos_in_file))
|
|
||||||
{
|
|
||||||
/// Moved, but remained within the buffer.
|
|
||||||
pos = working_buffer.begin() + (new_pos_in_file - first_read_pos_in_file);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/// Moved past the buffer.
|
|
||||||
pos = working_buffer.end();
|
|
||||||
first_unread_pos_in_file = new_pos_in_file;
|
|
||||||
|
|
||||||
/// If we go back, than it's not eof
|
|
||||||
is_eof = false;
|
|
||||||
|
|
||||||
/// We can not use the result of the current asynchronous request.
|
|
||||||
skip();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new_pos_in_file;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::synchronousRead()
|
|
||||||
{
|
|
||||||
CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read};
|
|
||||||
|
|
||||||
prepare();
|
|
||||||
bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin);
|
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadBufferAIORead);
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read);
|
|
||||||
|
|
||||||
finalize();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::receive()
|
|
||||||
{
|
|
||||||
if (!waitForAIOCompletion())
|
|
||||||
{
|
|
||||||
throw Exception("Trying to receive data from AIO, but nothing was queued. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
finalize();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::skip()
|
|
||||||
{
|
|
||||||
if (!waitForAIOCompletion())
|
|
||||||
return;
|
|
||||||
|
|
||||||
/// @todo I presume this assignment is redundant since waitForAIOCompletion() performs a similar one
|
|
||||||
// bytes_read = future_bytes_read.get();
|
|
||||||
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
|
|
||||||
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ReadBufferAIO::waitForAIOCompletion()
|
|
||||||
{
|
|
||||||
if (is_eof || !is_pending_read)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read};
|
|
||||||
|
|
||||||
bytes_read = future_bytes_read.get();
|
|
||||||
is_pending_read = false;
|
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadBufferAIORead);
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::prepare()
|
|
||||||
{
|
|
||||||
requested_byte_count = std::min(fill_buffer.internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE, max_bytes_read);
|
|
||||||
|
|
||||||
/// Region of the disk from which we want to read data.
|
|
||||||
const off_t region_begin = first_unread_pos_in_file;
|
|
||||||
|
|
||||||
if ((requested_byte_count > static_cast<size_t>(std::numeric_limits<off_t>::max())) ||
|
|
||||||
(first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - static_cast<off_t>(requested_byte_count))))
|
|
||||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
const off_t region_end = first_unread_pos_in_file + requested_byte_count;
|
|
||||||
|
|
||||||
/// The aligned region of the disk from which we will read the data.
|
|
||||||
region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
|
||||||
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
|
||||||
|
|
||||||
region_aligned_begin = region_begin - region_left_padding;
|
|
||||||
|
|
||||||
if (region_end > (std::numeric_limits<off_t>::max() - static_cast<off_t>(region_right_padding)))
|
|
||||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
const off_t region_aligned_end = region_end + region_right_padding;
|
|
||||||
region_aligned_size = region_aligned_end - region_aligned_begin;
|
|
||||||
|
|
||||||
buffer_begin = fill_buffer.internalBuffer().begin();
|
|
||||||
|
|
||||||
/// Unpoison because msan doesn't instrument linux AIO
|
|
||||||
__msan_unpoison(buffer_begin, fill_buffer.internalBuffer().size());
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadBufferAIO::finalize()
|
|
||||||
{
|
|
||||||
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
|
|
||||||
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
|
|
||||||
|
|
||||||
/// Ignore redundant bytes on the left.
|
|
||||||
bytes_read -= region_left_padding;
|
|
||||||
|
|
||||||
/// Ignore redundant bytes on the right.
|
|
||||||
bytes_read = std::min(static_cast<off_t>(bytes_read), static_cast<off_t>(requested_byte_count));
|
|
||||||
|
|
||||||
if (bytes_read > 0)
|
|
||||||
fill_buffer.buffer().resize(region_left_padding + bytes_read);
|
|
||||||
if (static_cast<size_t>(bytes_read) < requested_byte_count)
|
|
||||||
is_eof = true;
|
|
||||||
|
|
||||||
if (first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
|
|
||||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
first_unread_pos_in_file += bytes_read;
|
|
||||||
total_bytes_read += bytes_read;
|
|
||||||
nextimpl_working_buffer_offset = region_left_padding;
|
|
||||||
|
|
||||||
if (total_bytes_read == max_bytes_read)
|
|
||||||
is_eof = true;
|
|
||||||
|
|
||||||
/// Swap the main and duplicate buffers.
|
|
||||||
swap(fill_buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
@ -1,111 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
|
||||||
|
|
||||||
#include <IO/ReadBufferFromFileBase.h>
|
|
||||||
#include <IO/ReadBuffer.h>
|
|
||||||
#include <IO/BufferWithOwnMemory.h>
|
|
||||||
#include <IO/AIO.h>
|
|
||||||
#include <Core/Defines.h>
|
|
||||||
#include <Common/CurrentMetrics.h>
|
|
||||||
#include <string>
|
|
||||||
#include <limits>
|
|
||||||
#include <future>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <fcntl.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace CurrentMetrics
|
|
||||||
{
|
|
||||||
extern const Metric OpenFileForRead;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
/** Class for asynchronous data reading.
|
|
||||||
*/
|
|
||||||
class ReadBufferAIO final : public ReadBufferFromFileBase
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1,
|
|
||||||
char * existing_memory_ = nullptr);
|
|
||||||
~ReadBufferAIO() override;
|
|
||||||
|
|
||||||
ReadBufferAIO(const ReadBufferAIO &) = delete;
|
|
||||||
ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
|
|
||||||
|
|
||||||
void setMaxBytes(size_t max_bytes_read_);
|
|
||||||
off_t getPosition() override { return first_unread_pos_in_file - (working_buffer.end() - pos); }
|
|
||||||
std::string getFileName() const override { return filename; }
|
|
||||||
int getFD() const { return fd; }
|
|
||||||
|
|
||||||
off_t seek(off_t off, int whence) override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
///
|
|
||||||
bool nextImpl() override;
|
|
||||||
/// Synchronously read the data.
|
|
||||||
void synchronousRead();
|
|
||||||
/// Get data from an asynchronous request.
|
|
||||||
void receive();
|
|
||||||
/// Ignore data from an asynchronous request.
|
|
||||||
void skip();
|
|
||||||
/// Wait for the end of the current asynchronous task.
|
|
||||||
bool waitForAIOCompletion();
|
|
||||||
/// Prepare the request.
|
|
||||||
void prepare();
|
|
||||||
/// Prepare for reading a duplicate buffer containing data from
|
|
||||||
/// of the last request.
|
|
||||||
void finalize();
|
|
||||||
|
|
||||||
private:
|
|
||||||
/// Buffer for asynchronous data read operations.
|
|
||||||
BufferWithOwnMemory<ReadBuffer> fill_buffer;
|
|
||||||
|
|
||||||
/// Description of the asynchronous read request.
|
|
||||||
iocb request{};
|
|
||||||
std::future<ssize_t> future_bytes_read;
|
|
||||||
|
|
||||||
const std::string filename;
|
|
||||||
|
|
||||||
/// The maximum number of bytes that can be read.
|
|
||||||
size_t max_bytes_read = std::numeric_limits<size_t>::max();
|
|
||||||
/// Number of bytes requested.
|
|
||||||
size_t requested_byte_count = 0;
|
|
||||||
/// The number of bytes read at the last request.
|
|
||||||
ssize_t bytes_read = 0;
|
|
||||||
/// The total number of bytes read.
|
|
||||||
size_t total_bytes_read = 0;
|
|
||||||
|
|
||||||
/// The position of the first unread byte in the file.
|
|
||||||
off_t first_unread_pos_in_file = 0;
|
|
||||||
|
|
||||||
/// The starting position of the aligned region of the disk from which the data is read.
|
|
||||||
off_t region_aligned_begin = 0;
|
|
||||||
/// Left offset to align the region of the disk.
|
|
||||||
size_t region_left_padding = 0;
|
|
||||||
/// The size of the aligned region of the disk.
|
|
||||||
size_t region_aligned_size = 0;
|
|
||||||
|
|
||||||
/// The file descriptor for read.
|
|
||||||
int fd = -1;
|
|
||||||
|
|
||||||
/// The buffer to which the received data is written.
|
|
||||||
Position buffer_begin = nullptr;
|
|
||||||
|
|
||||||
/// The asynchronous read operation is not yet completed.
|
|
||||||
bool is_pending_read = false;
|
|
||||||
/// The end of the file is reached.
|
|
||||||
bool is_eof = false;
|
|
||||||
/// At least one read request was sent.
|
|
||||||
bool is_started = false;
|
|
||||||
/// Did the asynchronous operation fail?
|
|
||||||
bool aio_failed = false;
|
|
||||||
|
|
||||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
Loading…
Reference in New Issue
Block a user