dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-04-05 18:54:16 +03:00
parent 1a628a512f
commit ad1c5a36cf
2 changed files with 66 additions and 78 deletions

View File

@ -41,10 +41,10 @@ private:
void publishReceivedData();
void sync();
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
bool waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
void swapBuffers() noexcept;
void skipLastRequest();
void skipPendingAIO();
private:
/// Буфер для асинхронных операций чтения данных.
@ -68,11 +68,10 @@ private:
Position buffer_begin = nullptr;
off_t region_aligned_size = 0;
bool got_exception = false;
/// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false;
/// Было получено исключение.
bool got_exception = false;
/// Конец файла достигнут.
bool is_eof = false;
/// Был отправлен хоть один запрос на асинхронную операцию чтения.

View File

@ -61,6 +61,11 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
max_bytes_read = max_bytes_read_;
}
off_t ReadBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
off_t ReadBufferAIO::doSeek(off_t off, int whence)
{
off_t new_pos;
@ -112,17 +117,12 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
}
/// Сдвинулись, значит не можем использовать результат последнего асинхронного запроса.
skipLastRequest();
skipPendingAIO();
}
return new_pos;
}
off_t ReadBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
off_t ReadBufferAIO::getPositionInFileRelaxed() const noexcept
{
return pos_in_file - (working_buffer.end() - pos);
@ -178,6 +178,62 @@ void ReadBufferAIO::synchronousRead()
swapBuffers();
}
void ReadBufferAIO::sync()
{
if (!waitForAIOCompletion())
return;
publishReceivedData();
swapBuffers();
}
void ReadBufferAIO::skipPendingAIO()
{
if (!waitForAIOCompletion())
return;
is_started = false;
/// Несмотря на то, что не станем использовать результат последнего запроса,
/// убедимся, что запрос правильно выполнен.
bytes_read = events[0].res;
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
}
bool ReadBufferAIO::waitForAIOCompletion()
{
if (is_eof || !is_pending_read)
return false;
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
}
is_pending_read = false;
bytes_read = events[0].res;
return true;
}
void ReadBufferAIO::swapBuffers() noexcept
{
internalBuffer().swap(fill_buffer.internalBuffer());
buffer().swap(fill_buffer.buffer());
std::swap(position(), fill_buffer.position());
}
void ReadBufferAIO::initRequest()
{
/// Количество запрашиваемых байтов.
@ -235,71 +291,4 @@ void ReadBufferAIO::publishReceivedData()
is_eof = true;
}
void ReadBufferAIO::sync()
{
if (is_eof || !is_pending_read)
return;
waitForAIOCompletion();
swapBuffers();
}
void ReadBufferAIO::skipLastRequest()
{
if (is_eof || !is_pending_read)
return;
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
}
is_pending_read = false;
is_started = false;
/// Несмотря на то, что не станем использовать результат последнего запроса,
/// убедимся, что запрос правильно выполнен.
bytes_read = events[0].res;
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
}
void ReadBufferAIO::waitForAIOCompletion()
{
if (!is_pending_read)
return;
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
}
is_pending_read = false;
bytes_read = events[0].res;
publishReceivedData();
}
void ReadBufferAIO::swapBuffers() noexcept
{
internalBuffer().swap(fill_buffer.internalBuffer());
buffer().swap(fill_buffer.buffer());
std::swap(position(), fill_buffer.position());
}
}