dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-04-05 20:09:23 +03:00
parent bc9ec44a61
commit 50d103a4c2
2 changed files with 12 additions and 34 deletions

View File

@ -68,7 +68,6 @@ private:
Position buffer_begin = nullptr; Position buffer_begin = nullptr;
off_t region_aligned_size = 0; off_t region_aligned_size = 0;
bool got_exception = false;
/// Асинхронная операция чтения ещё не завершилась. /// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false; bool is_pending_read = false;
@ -76,6 +75,7 @@ private:
bool is_eof = false; bool is_eof = false;
/// Был отправлен хоть один запрос на асинхронную операцию чтения. /// Был отправлен хоть один запрос на асинхронную операцию чтения.
bool is_started = false; bool is_started = false;
bool aio_failed = false;
}; };
} }

View File

@ -25,7 +25,6 @@ ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_,
fd = ::open(filename.c_str(), open_flags); fd = ::open(filename.c_str(), open_flags);
if (fd == -1) if (fd == -1)
{ {
got_exception = true;
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE; auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrno("Cannot open file " + filename, error_code); throwFromErrno("Cannot open file " + filename, error_code);
} }
@ -35,13 +34,16 @@ ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_,
ReadBufferAIO::~ReadBufferAIO() ReadBufferAIO::~ReadBufferAIO()
{ {
try if (!aio_failed)
{ {
(void) waitForAIOCompletion(); try
} {
catch (...) (void) waitForAIOCompletion();
{ }
tryLogCurrentException(__PRETTY_FUNCTION__); catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
if (fd != -1) if (fd != -1)
@ -51,10 +53,7 @@ ReadBufferAIO::~ReadBufferAIO()
void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_) void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
{ {
if (is_started) if (is_started)
{
got_exception = true;
throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR); throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR);
}
max_bytes_read = max_bytes_read_; max_bytes_read = max_bytes_read_;
} }
@ -70,10 +69,7 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
if (whence == SEEK_SET) if (whence == SEEK_SET)
{ {
if (off < 0) if (off < 0)
{
got_exception = true;
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = off; new_pos = off;
} }
else if (whence == SEEK_CUR) else if (whence == SEEK_CUR)
@ -81,23 +77,14 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
if (off >= 0) if (off >= 0)
{ {
if (off > (std::numeric_limits<off_t>::max() - getPositionInFileRelaxed())) if (off > (std::numeric_limits<off_t>::max() - getPositionInFileRelaxed()))
{
got_exception = true;
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
} }
else if (off < -getPositionInFileRelaxed()) else if (off < -getPositionInFileRelaxed())
{
got_exception = true;
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = getPositionInFileRelaxed() + off; new_pos = getPositionInFileRelaxed() + off;
} }
else else
{
got_exception = true;
throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (new_pos != getPositionInFileRelaxed()) if (new_pos != getPositionInFileRelaxed())
{ {
@ -158,7 +145,7 @@ bool ReadBufferAIO::nextImpl()
{ {
if (errno != EINTR) if (errno != EINTR)
{ {
got_exception = true; aio_failed = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR); throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
} }
} }
@ -198,10 +185,7 @@ void ReadBufferAIO::skipPendingAIO()
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE; size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding)) if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
} }
bool ReadBufferAIO::waitForAIOCompletion() bool ReadBufferAIO::waitForAIOCompletion()
@ -213,7 +197,7 @@ bool ReadBufferAIO::waitForAIOCompletion()
{ {
if (errno != EINTR) if (errno != EINTR)
{ {
got_exception = true; aio_failed = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR); throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
} }
} }
@ -257,10 +241,7 @@ void ReadBufferAIO::publishReceivedData()
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE; size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding)) if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
/// Игнорируем излишние байты слева. /// Игнорируем излишние байты слева.
bytes_read -= region_left_padding; bytes_read -= region_left_padding;
@ -269,10 +250,7 @@ void ReadBufferAIO::publishReceivedData()
bytes_read = std::min(bytes_read, static_cast<off_t>(requested_byte_count)); bytes_read = std::min(bytes_read, static_cast<off_t>(requested_byte_count));
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read)) if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR); throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
::memmove(buffer_begin, buffer_begin + region_left_padding, bytes_read); ::memmove(buffer_begin, buffer_begin + region_left_padding, bytes_read);